Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2012-09-17 11:01:46 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2012-09-17 11:01:46 (GMT)
commit039f54130fdd472b0a40c834f67e2fc7248ad564 (patch)
treee49bacc21ccaa0e1e4f752a03e4d3e1c1bcc9fe9
parent105b730aa27977769375e6c09c50d7141eae0df2 (diff)
Polish design
* only RESTful and DBus API for IPC; * use SSE for subscriptions
-rwxr-xr-xexamples/activities.py25
-rwxr-xr-xexamples/client.py79
-rwxr-xr-xsugar-network-server13
-rwxr-xr-xsugar-network-service102
-rw-r--r--sugar_network/__init__.py16
-rw-r--r--sugar_network/client/__init__.py14
-rw-r--r--sugar_network/client/bus.py303
-rw-r--r--sugar_network/client/cursor.py293
-rw-r--r--sugar_network/client/objects.py191
-rw-r--r--sugar_network/local/__init__.py4
-rw-r--r--sugar_network/local/bus.py130
-rw-r--r--sugar_network/local/dbus_client.py (renamed from sugar_network/client/dbus_client.py)0
-rw-r--r--sugar_network/local/ipc_client.py30
-rw-r--r--sugar_network/local/mounts.py87
-rw-r--r--sugar_network/local/mountset.py33
-rw-r--r--sugar_network/node/__init__.py27
-rw-r--r--sugar_network/node/commands.py32
-rw-r--r--sugar_network/node/subscribe_socket.py97
-rw-r--r--sugar_network/resources/__init__.py2
-rw-r--r--sugar_network/resources/context.py14
-rw-r--r--sugar_network/resources/user.py21
-rw-r--r--sugar_network/resources/volume.py56
-rw-r--r--sugar_network/toolkit/dbus_thread.py8
-rw-r--r--sugar_network/toolkit/http.py315
-rw-r--r--sugar_network/toolkit/ipc.py76
-rw-r--r--sugar_network/toolkit/router.py (renamed from sugar_network/node/router.py)138
-rw-r--r--sugar_network/zerosugar/feeds.py5
-rw-r--r--sugar_network/zerosugar/injector.py19
-rw-r--r--tests/__init__.py24
-rw-r--r--tests/units/__main__.py25
-rwxr-xr-xtests/units/bus.py79
-rwxr-xr-xtests/units/dbus_client.py4
-rwxr-xr-xtests/units/dbus_datastore.py4
-rwxr-xr-xtests/units/home_mount.py335
-rwxr-xr-xtests/units/injector.py79
-rwxr-xr-xtests/units/ipc_client.py347
-rwxr-xr-xtests/units/mountset.py197
-rwxr-xr-xtests/units/node.py38
-rwxr-xr-xtests/units/node_mount.py392
-rwxr-xr-xtests/units/remote_mount.py508
-rwxr-xr-xtests/units/router.py23
-rwxr-xr-xtests/units/subscribe_socket.py152
-rwxr-xr-xtests/units/sync_master.py16
-rwxr-xr-xtests/units/sync_node.py22
-rwxr-xr-xtests/units/volume.py95
45 files changed, 1322 insertions, 3148 deletions
diff --git a/examples/activities.py b/examples/activities.py
deleted file mode 100755
index 875728c..0000000
--- a/examples/activities.py
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/usr/bin/env python
-
-import sugar_network
-
-
-def main():
- client = sugar_network.Client('/')
- client.checkin('com.ywwg.CartoonBuilderActivity')
-
-
-if __name__ == '__main__':
- import os
- from sugar_network import local
-
- os.system('sugar-network-service -DD start '
- '--local-root=tmp '
- '--activity-dirs=tmp/Activities '
- '--api-url=http://localhost:8000')
- try:
- local.local_root.value = 'tmp'
- local.api_url.value = 'http://localhost:8000'
- local.activity_dirs.value = ['tmp/Activities']
- main()
- finally:
- os.system('sugar-network-service --local-root=tmp stop')
diff --git a/examples/client.py b/examples/client.py
deleted file mode 100755
index 895306a..0000000
--- a/examples/client.py
+++ /dev/null
@@ -1,79 +0,0 @@
-#!/usr/bin/env python
-
-import sugar_network
-
-
-def main():
- client = sugar_network.Client('~')
-
- guids = [None] * 3
- titles = ['Title1', 'Title2', 'Title3']
-
- print '-- Delete objects'
- for guid in [i['guid'] for i in client.Context.cursor()]:
- client.Context.delete(guid)
-
- def context_new(title):
- context = client.Context()
- context['type'] = 'activity'
- context['title'] = title
- context['summary'] = 'Description'
- context['description'] = 'Description'
- context.post()
- return context['guid']
-
- print '-- Create new objects'
- guids[0] = context_new(titles[0])
- assert guids[0]
- guids[1] = context_new(titles[1])
- assert guids[1] and guids[1] != guids[0]
- guids[2] = context_new(titles[2])
- assert guids[2] and guids[2] != guids[1] and guids[2] != guids[0]
-
- print '-- Browse using iterators'
- for i, obj in enumerate(client.Context.cursor()):
- assert i == obj.offset
- assert obj['guid'] == guids[i]
-
- print '-- Browse by offset'
- query = client.Context.cursor()
- for i in range(query.total):
- assert query[i]['guid'] == guids[i]
-
- print '-- Get objects directly'
- assert client.Context(guids[0], reply=['title'])['title'] == titles[0]
- assert client.Context(guids[1], reply=['title'])['title'] == titles[1]
- assert client.Context(guids[2], reply=['title'])['title'] == titles[2]
-
- print '-- Set BLOB property'
- client.Context(guids[1]).set_blob('icon', 'string')
-
- print '-- Get BLOB property'
- assert client.Context(guids[1]).get_blob('icon').read() == 'string'
-
- print '-- Query by property value'
- for obj in client.Context.cursor(title='Title2', reply=['guid', 'title']):
- assert obj['guid'] == guids[1]
- assert obj['title'] == titles[1]
-
- print '-- Full text search query'
- query = client.Context.cursor('Title1 OR Title3', reply=['guid', 'title'])
- assert query.total == 2
-
- assert sorted([(guids[0], titles[0]), (guids[2], titles[2])]) == \
- sorted([(i['guid'], i['title']) for i in query])
-
-
-if __name__ == '__main__':
- import os
- from sugar_network import local
-
- os.system('sugar-network-service -DD start '
- '--local-root=tmp '
- '--activity-dirs=tmp/Activities '
- '--api-url=http://localhost:8000')
- try:
- local.local_root.value = 'tmp'
- main()
- finally:
- os.system('sugar-network-service --local-root=tmp stop')
diff --git a/sugar-network-server b/sugar-network-server
index 1878faf..847f716 100755
--- a/sugar-network-server
+++ b/sugar-network-server
@@ -24,16 +24,15 @@ import active_document as ad
import sugar_network_webui as webui
from active_toolkit import coroutine, application
from active_toolkit.options import Option
-from sugar_network import node, local, Client
+from sugar_network import node, local
from sugar_network.local.mounts import LocalMount
from sugar_network.local.mountset import Mountset
from sugar_network.local.mounts import LocalMount
from sugar_network.node import stats
-from sugar_network.node.router import Router
from sugar_network.node.commands import MasterCommands
-from sugar_network.node.subscribe_socket import SubscribeSocket
from sugar_network.resources.volume import Volume
from sugar_network.toolkit import sugar, sneakernet
+from sugar_network.toolkit.router import Router
class Application(application.Daemon):
@@ -53,16 +52,13 @@ class Application(application.Daemon):
volume = Volume(node.data_root.value)
self.jobs.spawn(volume.populate)
- subscriber = SubscribeSocket(volume,
- node.host.value, node.subscribe_port.value)
- cp = MasterCommands(volume, subscriber)
+ cp = MasterCommands(volume)
logging.info('Listening for requests on %s:%s',
node.host.value, node.port.value)
server = coroutine.WSGIServer((node.host.value, node.port.value),
Router(cp), **ssl_args)
self.jobs.spawn(server.serve_forever)
- self.jobs.spawn(subscriber.serve_forever)
if webui.webui.value:
# XXX Until implementing regular web users
@@ -74,11 +70,10 @@ class Application(application.Daemon):
local.mounts_root.value = None
mountset = Mountset(None)
mountset['/'] = LocalMount(volume)
- Client.connection = mountset
host = (webui.webui_host.value, webui.webui_port.value)
logging.info('Start Web server on %s:%s port', *host)
- server = coroutine.WSGIServer(host, webui.get_app())
+ server = coroutine.WSGIServer(host, webui.get_app(mountset))
self.jobs.spawn(server.serve_forever)
try:
diff --git a/sugar-network-service b/sugar-network-service
index 7bb8d23..d461457 100755
--- a/sugar-network-service
+++ b/sugar-network-service
@@ -29,15 +29,15 @@ from os.path import join, abspath, exists
import active_document as ad
import sugar_network_webui as webui
-from sugar_network import toolkit, local, node, Client
+from sugar_network import toolkit, local, node
from sugar_network.toolkit import sugar, sneakernet
from sugar_network.toolkit import dbus_thread, mounts_monitor
from sugar_network.local import activities, datastore
from sugar_network.local.dbus_datastore import Datastore
from sugar_network.local.dbus_network import Network
-from sugar_network.local.bus import IPCServer
from sugar_network.local.mounts import HomeMount, RemoteMount
from sugar_network.local.mountset import Mountset
+from sugar_network.local.ipc_client import Router
from sugar_network.node import stats
from sugar_network.resources.volume import Volume
from active_toolkit.options import Option
@@ -154,72 +154,51 @@ class Application(application.Application):
@application.command(
'start service and log to files',
name='start', keep_stdout=True)
- def _start(self, minimal=False):
+ def _start(self):
if self.check_for_instance():
printf.info('%s is already run', self.name)
exit(1)
- if local.anonymous.value:
- sugar.uid = lambda: 'anonymous'
- sugar.nickname = lambda: 'anonymous'
- sugar.color = lambda: '#000000,#000000'
- else:
- # In case if it is new Sugar Shell profile
- toolkit.ensure_dsa_pubkey(sugar.profile_path('owner.key'))
-
jobs = coroutine.Pool()
-
- volume = Volume(self._db_path, lazy_open=local.lazy_open.value)
- mountset = Mountset(volume)
- mountset['~'] = HomeMount(volume)
- mountset['/'] = RemoteMount(volume)
-
- # Point this process code to `mountset` directly passing over IPC
- Client.connection = mountset
+ mountset = self._mountset()
def delayed_start(event=None):
logging.info('Proceed delayed start')
mountset.disconnect(delayed_start)
- volume.populate()
- datastore.populate(volume['artifact'])
+ mountset.home_volume.populate()
+ datastore.populate(mountset.home_volume['artifact'])
+ self._sync(mountset.home_volume)
- stale_keep_impls = []
- Context = Client('~').Context
- for context in Context.cursor(keep_impl=[1, 2]):
- if not activities.ensure_checkins(context.guid):
- stale_keep_impls.append(context.guid)
- for guid in stale_keep_impls:
- Context(guid, keep_impl=0).post()
+ logging.info('Listening for IPC requests on %s port',
+ local.ipc_port.value)
+ server = coroutine.WSGIServer(('localhost', local.ipc_port.value),
+ Router(mountset))
+ jobs.spawn(server.serve_forever)
- jobs.spawn(activities.monitor, volume['context'],
+ jobs.spawn(activities.monitor, mountset.home_volume['context'],
local.activity_dirs.value)
if webui.webui.value:
host = (webui.webui_host.value, webui.webui_port.value)
logging.info('Start Web server on %s:%s', *host)
- server = coroutine.WSGIServer(host, webui.get_app())
+ server = coroutine.WSGIServer(host, webui.get_app(mountset))
jobs.spawn(server.serve_forever)
if local.mounts_root.value:
mounts_monitor.start(abspath(local.mounts_root.value))
- if not minimal:
- if local.delayed_start.value:
- mountset.connect(delayed_start, event='delayed-start')
- else:
- delayed_start()
- dbus_thread.spawn_service(Datastore)
- dbus_thread.spawn_service(Network)
+ if local.delayed_start.value:
+ mountset.connect(delayed_start, event='delayed-start')
+ else:
+ delayed_start()
+ dbus_thread.spawn_service(Datastore)
+ dbus_thread.spawn_service(Network)
pid_path = self.new_instance()
try:
mountset.open()
- jobs.spawn(IPCServer(mountset).serve_forever)
- if minimal:
- jobs.join()
- else:
- dbus_thread.start(mountset)
+ dbus_thread.start(mountset)
except KeyboardInterrupt:
util.exception('%s interrupted', self.name)
except Exception:
@@ -230,6 +209,22 @@ class Application(application.Application):
mountset.close()
os.unlink(pid_path)
+ def _mountset(self):
+ if local.anonymous.value:
+ sugar.uid = lambda: 'anonymous'
+ sugar.nickname = lambda: 'anonymous'
+ sugar.color = lambda: '#000000,#000000'
+ else:
+ # In case if it is new Sugar Shell profile
+ toolkit.ensure_dsa_pubkey(sugar.profile_path('owner.key'))
+
+ volume = Volume(self._db_path, lazy_open=local.lazy_open.value)
+ mountset = Mountset(volume)
+ mountset['~'] = HomeMount(volume)
+ mountset['/'] = RemoteMount(volume)
+
+ return mountset
+
@contextmanager
def _rendezvous(self):
@@ -248,21 +243,26 @@ class Application(application.Application):
self._events['sync_complete'] = coroutine.Event()
- server = None
+ pid_path = None
+ mountset = None
try:
if not self.check_for_instance():
printf.info('%s is not started, ' \
'launch it for this command only', self.name)
- server = coroutine.spawn(self._start, True)
+ pid_path = self.new_instance()
+ mountset = self._mountset()
+ dbus_thread.spawn_service(Network)
+ coroutine.spawn(dbus_thread.start, mountset)
coroutine.dispatch()
- Client.connection.opened.wait()
+ mountset.opened.wait()
Client.connect(events_cb)
yield
finally:
- if server is not None:
- server.kill()
+ if mountset is not None:
+ mountset.close()
+ os.unlink(pid_path)
def _call(self, method, content=None):
kwargs = {}
@@ -300,6 +300,12 @@ class Application(application.Application):
raise
break
+ def _sync(self, volume):
+ docs, __ = volume['context'].find(limit=ad.MAX_LIMIT, keep_impl=[1, 2])
+ for context in docs:
+ if not activities.ensure_checkins(context.guid):
+ contexts.update(context.guid, keep_impl=0)
+
locale.setlocale(locale.LC_ALL, '')
@@ -314,7 +320,7 @@ local.tmpdir.value = sugar.profile_path('tmp')
Option.seek('main', [application.debug])
Option.seek('webui', webui)
Option.seek('local', local)
-Option.seek('node', [node.port, node.subscribe_port, node.sync_dirs])
+Option.seek('node', [node.port, node.sync_dirs])
Option.seek('stats', stats)
Option.seek('active-document', ad)
diff --git a/sugar_network/__init__.py b/sugar_network/__init__.py
index de95e09..7ace3f2 100644
--- a/sugar_network/__init__.py
+++ b/sugar_network/__init__.py
@@ -14,14 +14,26 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from sugar_network.toolkit import sugar
-from sugar_network.client.bus import Client, ServerError
from sugar_network.local.activities import checkins
from sugar_network.local import api_url, server_mode
from sugar_network_webui import webui_port
from sugar_network.zerosugar.injector import launch, checkin
+def Client(url=None, **kwargs):
+ from sugar_network.toolkit import http
+ if url is None:
+ url = api_url.value
+ return http.Client(url, **kwargs)
+
+
+def IPCClient(**kwargs):
+ from sugar_network.toolkit import http
+ from sugar_network.local import ipc_port
+ return http.Client('http://localhost:%s' % ipc_port.value, **kwargs)
+
+
def DBusClient(*args, **kwargs):
# Avoid importing dbus related modules by default
- from sugar_network.client import dbus_client
+ from sugar_network.local import dbus_client
return dbus_client.DBusClient(*args, **kwargs)
diff --git a/sugar_network/client/__init__.py b/sugar_network/client/__init__.py
deleted file mode 100644
index ee29047..0000000
--- a/sugar_network/client/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# Copyright (C) 2012 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
diff --git a/sugar_network/client/bus.py b/sugar_network/client/bus.py
deleted file mode 100644
index 2fb4b68..0000000
--- a/sugar_network/client/bus.py
+++ /dev/null
@@ -1,303 +0,0 @@
-# Copyright (C) 2012 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import json
-import socket
-import logging
-from contextlib import contextmanager
-
-import active_document as ad
-from active_toolkit import util, coroutine, sockets
-from sugar_network import local
-from sugar_network.toolkit import ipc, sugar
-
-
-_CONNECTION_POOL = 6
-
-_logger = logging.getLogger('sugar_network')
-
-
-class ServerError(RuntimeError):
- pass
-
-
-class classproperty(property):
-
- def __get__(self, obj, type_):
- # pylint: disable-msg=E1101
- return self.fget.__get__(None, type_)()
-
-
-class Client(object):
- """IPC class to get access from a client side.
-
- See http://wiki.sugarlabs.org/go/Platform_Team/Sugar_Network/Client
- for detailed information.
-
- """
- _connection = None
-
- @classproperty
- @classmethod
- def connection(cls):
- if cls._connection is None:
- cls._connection = _Connection()
- return cls._connection
-
- @classmethod
- def close(cls):
- cls._connection = None
-
- @classmethod
- def call(cls, method, cmd=None, content=None,
- content_type='application/json', **kwargs):
- request = ad.Request(kwargs)
- request.access_level = ad.ACCESS_LOCAL
- request.principal = sugar.uid()
- request['method'] = method
- if cmd:
- request['cmd'] = cmd
- request.content = content
- request.content_type = content_type
- return cls.connection.call(request)
-
- @classmethod
- def publish(cls, event, **kwargs):
- kwargs['event'] = event
- cls.connection.publish(kwargs)
-
- @classmethod
- def connect(cls, callback, **condition):
- cls.connection.connect(callback, condition)
-
- @classmethod
- def disconnect(cls, callback):
- cls.connection.disconnect(callback)
-
- @classmethod
- def subscribe(cls):
- """Start subscription session.
-
- :returns:
- `SocketFile` object connected to IPC server to read events from
-
- """
- ipc.rendezvous()
- # pylint: disable-msg=E1101
- conn = sockets.SocketFile(coroutine.socket(socket.AF_UNIX))
- conn.connect(local.ensure_path('run', 'subscribe'))
- return conn
-
- @classmethod
- def mounts(cls):
- return cls.call('GET', 'mounts')
-
- def __init__(self, mountpoint):
- self._mountpoint = mountpoint
- self._resources = {}
-
- @property
- def connected(self):
- _logger.warning('Client.connected is depecated, '
- 'use Client.mounted instead')
- return self.call('GET', 'mounted', mountpoint=self._mountpoint)
-
- @property
- def mounted(self):
- return self.call('GET', 'mounted', mountpoint=self._mountpoint)
-
- def launch(self, context, command='activity', object_id=None, uri=None,
- args=None):
- """Launch context implementation.
-
- Function will call fork at the beginning. In forked process,
- it will try to choose proper implementation to execute and launch it.
-
- Execution log will be stored in `~/.sugar/PROFILE/logs` directory.
-
- :param context:
- context GUID to look for implementations
- :param command:
- command that selected implementation should support
- :param object_id:
- optional id to restore Journal object
- :param uri:
- optional uri to open; if implementation supports it
- :param args:
- optional list of arguments to pass to launching implementation
-
- """
- # TODO Make a diference in launching from "~" and "/" mounts
- self.publish('launch', mountpoint=self._mountpoint, context=context,
- command=command, object_id=object_id, uri=uri, args=args)
-
- def __getattr__(self, name):
- """Class-like object to access to a resource or call a method.
-
- :param name:
- resource name started with capital char
- :returns:
- a class-like resource object
-
- """
- resource = self._resources.get(name)
- if resource is None:
- resource = _Resource(self._mountpoint, name.lower())
- self._resources[name] = resource
- return resource
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.close()
-
-
-class _Connection(object):
-
- def __init__(self):
- self._pool = coroutine.Queue(maxsize=_CONNECTION_POOL)
- self._pool_size = 0
- self._subscribe_job = None
- self._subscriptions = {}
-
- def call(self, request, response=None):
- with self._pipe() as pipe:
- request['content_type'] = request.content_type
- pipe.write_message(request)
- if request.content_type == 'application/json':
- request.content = json.dumps(request.content)
- pipe.write(request.content)
- reply = pipe.read_message()
-
- _logger.debug('Made a call: request=%r reply=%r', request, reply)
-
- if type(reply) is dict and 'error' in reply:
- raise ServerError(reply['error'])
-
- return reply
-
- def connect(self, callback, condition=None):
- if self._subscribe_job is None:
- self._subscribe_job = coroutine.spawn(self._subscribe)
- self._subscriptions[callback] = condition or {}
-
- def disconnect(self, callback):
- if callback in self._subscriptions:
- del self._subscriptions[callback]
-
- def publish(self, event):
- request = ad.Request()
- request['method'] = 'POST'
- request['cmd'] = 'publish'
- request.content = event
- request.content_type = 'application/json'
- self.call(request)
-
- def close(self):
- if self._subscribe_job is not None:
- _logger.debug('Stop waiting for events')
- self._subscribe_job.kill()
- self._subscribe_job = None
-
- while not self._pool.empty():
- conn = self._pool.get_nowait()
- try:
- _logger.debug('Close IPC connection: %r', conn)
- conn.close()
- except Exception:
- util.exception(_logger, 'Cannot close IPC connection')
-
- @contextmanager
- def _pipe(self):
- if self._pool.qsize() or self._pool_size >= self._pool.maxsize:
- conn = self._pool.get()
- else:
- self._pool_size += 1
- ipc.rendezvous()
- # pylint: disable-msg=E1101
- conn = coroutine.socket(socket.AF_UNIX)
- conn.connect(local.ensure_path('run', 'accept'))
- _logger.debug('Open new IPC connection: %r', conn)
- try:
- yield sockets.SocketFile(conn)
- finally:
- self._pool.put(conn)
-
- def _subscribe(self):
- _logger.debug('Start waiting for events')
-
- conn = Client.subscribe()
- try:
- while True:
- coroutine.select([conn.fileno()], [], [])
- event = conn.read_message()
- if event is None:
- break
- for callback, condition in self._subscriptions.items():
- for key, value in condition.items():
- if event.get(key) != value:
- break
- else:
- callback(event)
- finally:
- conn.close()
-
-
-class _Resource(object):
-
- def __init__(self, mountpoint, name):
- self.mountpoint = mountpoint
- self.document = name
-
- def cursor(self, query=None, order_by=None, reply=None, page_size=18,
- **filters):
- """Query resource objects.
-
- :param query:
- full text search query string in Xapian format
- :param order_by:
- name of property to sort by; might be prefixed by either `+` or `-`
- to change order's direction
- :param reply:
- list of property names to return for found objects;
- by default, only GUIDs will be returned; for missed properties,
- will be sent additional requests to a server on getting access
- to particular object.
- :param page_size:
- number of items in one cached page, there are might be several
- (at least two) pages
- :param filters:
- a dictionary of properties to filter resulting list
-
- """
- from sugar_network.client.cursor import Cursor
- return Cursor(self.mountpoint, self.document, query, order_by, reply,
- page_size, **filters)
-
- def delete(self, guid):
- """Delete resource object.
-
- :param guid:
- resource object's GUID
-
- """
- return Client.call('DELETE',
- mountpoint=self.mountpoint, document=self.document, guid=guid)
-
- def __call__(self, guid=None, reply=None, **kwargs):
- from sugar_network.client.objects import Object
- return Object(self.mountpoint, self.document, reply or [], guid,
- **kwargs)
diff --git a/sugar_network/client/cursor.py b/sugar_network/client/cursor.py
deleted file mode 100644
index 311a117..0000000
--- a/sugar_network/client/cursor.py
+++ /dev/null
@@ -1,293 +0,0 @@
-# Copyright (C) 2012 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import logging
-import collections
-
-from sugar_network.client.bus import Client
-from sugar_network.client.objects import Object
-from active_toolkit import coroutine, util, enforce
-
-
-_QUERY_PAGES_NUMBER = 2
-
-
-_logger = logging.getLogger('sugar_network.cursor')
-
-
-class Cursor(object):
-
- def __init__(self, mountpoint, document, query, order_by, reply, page_size,
- **filters):
- self.mountpoint = mountpoint
- self.document = document
- self._query = query
- self._order_by = order_by
- self._reply = reply or ['guid']
- if 'guid' not in self._reply:
- self._reply.append('guid')
- self._page_size = page_size
- self._filters = filters or {}
- self._total = None
- self._page_access = collections.deque([], _QUERY_PAGES_NUMBER)
- self._pages = {}
- self._offset = -1
- self._wait_session = None
-
- Client.connect(self.__event_cb)
-
- def close(self):
- Client.disconnect(self.__event_cb)
-
- # pylint: disable-msg=E1101,E0102,E0202
- @property
- def offset(self):
- """Current position in query results."""
- return self._offset
-
- @offset.setter
- def offset(self, value):
- """Change current position in query results."""
- self._offset = max(-1, value)
-
- @property
- def total(self):
- """Total number of objects."""
- if self._total is None:
- if not self._fetch_page(0):
- return 0
- return self._total
-
- @property
- def order_by(self):
- """Current order of resulting list.
-
- Name of property to sort by. Might be prefixed by either `+` or `-`
- to change order's direction.
-
- """
- return self._order_by
-
- # pylint: disable-msg=E1101,E0102
- @order_by.setter
- def order_by(self, value):
- if self._order_by == value:
- return
- self._order_by = value
- self._reset()
-
- def read_events(self):
- if self._wait_session is None:
- self._wait_session = _WaitSession()
-
- with self._wait_session as session:
- while session.wait():
- for event in session:
- if event['event'] == 'commit':
- # TODO If cursor formed by fulltext query,
- # it should refreshed as well
- continue
- # TODO Replace by changed offset
- self._reset()
- yield None
-
- def __iter__(self):
- while self.offset + 1 < self.total:
- self.offset += 1
- obj = self.get(self.offset)
- if obj is None:
- break
- yield obj
- self.offset = -1
-
- def filter(self, query=None, **filters):
- """Change query parameters.
-
- :param query:
- full text search query string in Xapian format
- :param filters:
- a dictionary of properties to filter resulting list
-
- """
- if query == self._query and filters == self._filters:
- return
- self._query = query
- self._filters = filters
- self._reset()
-
- def update_filter(self, query=None, **filters):
- """Update query parameters applying them on top of existing.
-
- :param query:
- full text search query string in Xapian format
- :param filters:
- a dictionary of properties to filter resulting list
-
- """
- new_filters = self._filters.copy()
- new_filters.update(filters)
- self.filter(query, **new_filters)
-
- def get(self, key, default=None):
- """Get either object by key or default value.
-
- :param key:
- `key` value might be an `int` value (offset within the cursor),
- or a string to treat it as GUID
- :param default:
- value to return if key if not found
- :returns:
- `Object` value or `default`
-
- """
- if type(key) is not int:
- for page in self._pages.values():
- for obj in page:
- if obj is not None and obj.guid == key:
- return obj
- return Object(self.mountpoint, self.document, self._reply, key)
- else:
- offset = key
-
- if offset < 0 or self._total is not None and \
- (offset >= self._total):
- return default
-
- page = offset / self._page_size
- offset -= page * self._page_size
-
- if page not in self._pages:
- if not self._fetch_page(page):
- return default
-
- if offset >= len(self._pages[page]):
- total = page + len(self._pages[page])
- _logger.warning('Miscalculated total number, %s instead of %s',
- total, self._total)
- self._total = total
- return default
-
- return self._pages[page][offset]
-
- def __getitem__(self, key):
- """Get object by key.
-
- :param key:
- `key` value might be an `int` value (offset within the cursor),
- or a string to treat it as GUID
- :returns:
- `Object` value or raise `KeyError` exception if key is not found
-
- """
- result = self.get(key)
- enforce(result is not None, KeyError, 'Key is out of range')
- return result
-
- def _fetch_page(self, page):
- offset = page * self._page_size
-
- params = {
- 'mountpoint': self.mountpoint,
- 'document': self.document,
- }
- for key, value in self._filters.items():
- if value is not None:
- params[key] = value
- params['offset'] = offset
- params['limit'] = self._page_size
- if self._query:
- params['query'] = self._query
- if self._order_by:
- params['order_by'] = self._order_by
- if self._reply:
- params['reply'] = self._reply
-
- try:
- response = Client.call('GET', **params)
- self._total = response['total']
- except Exception:
- util.exception(_logger, 'Failed to fetch %r query', params)
- self._total = None
- return False
-
- result = [None] * len(response['result'])
- for i, props in enumerate(response['result']):
- result[i] = Object(self.mountpoint, self.document, self._reply,
- props['guid'], props, offset + i)
-
- if not self._page_access or self._page_access[-1] != page:
- if len(self._page_access) == _QUERY_PAGES_NUMBER:
- del self._pages[self._page_access[0]]
- self._page_access.append(page)
- self._pages[page] = result
-
- return True
-
- def _reset(self):
- self._page_access.clear()
- self._pages.clear()
- self._total = None
-
- def __event_cb(self, event):
- mountpoint = event.get('mountpoint')
- document = event.get('document')
- if document == self.document and mountpoint == self.mountpoint or \
- document == self.document and not mountpoint or \
- mountpoint == self.mountpoint and \
- event['event'] in ('mount', 'unmount'):
- self._reset()
- if self._wait_session is not None:
- self._wait_session.push(event)
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.close()
-
-
-class _WaitSession(object):
-
- def __init__(self):
- self._signal = coroutine.Event()
- self._users = 0
- self._queue = collections.deque()
-
- def __enter__(self):
- if self._users:
- # Break previous waiting session,
- # only one `wait()` should exist for one session
- # TODO Support multiple clients
- self._signal.set()
- self._signal.clear()
- self._users += 1
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- self._users -= 1
-
- def push(self, event):
- if self._users > 0:
- self._queue.append(event)
- self._signal.set()
- self._signal.clear()
-
- def wait(self):
- self._queue.clear()
- self._signal.wait()
- return bool(self._queue)
-
- def __iter__(self):
- return iter(self._queue)
diff --git a/sugar_network/client/objects.py b/sugar_network/client/objects.py
deleted file mode 100644
index 4b64f2f..0000000
--- a/sugar_network/client/objects.py
+++ /dev/null
@@ -1,191 +0,0 @@
-# Copyright (C) 2012 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import logging
-from cStringIO import StringIO
-from os.path import isdir, abspath
-
-from sugar_network.client.bus import Client
-from sugar_network.toolkit import sugar
-from active_toolkit import enforce
-
-
-_logger = logging.getLogger('sugar_network.objects')
-
-
-class Object(object):
-
- def __init__(self, mountpoint, document, reply, guid=None, props=None,
- offset=None, **kwargs):
- self.mountpoint = mountpoint
- self.document = document
- self._reply = reply or []
- self._guid = guid
- self._props = props or {}
- self._blobs = {}
- self._dirty = set()
- self.offset = offset
-
- for prop, value in kwargs.items():
- self[prop] = value
-
- @property
- def guid(self):
- return self._guid
-
- def get(self, prop):
- if prop == 'guid':
- return self._guid
- result = self._props.get(prop)
- if result is None:
- enforce(prop in self._reply,
- 'Access to not requested %r property in %r from %r mount',
- prop, self.document, self.mountpoint)
- self.fetch()
- result = self._props.get(prop)
- return result
-
- def fetch(self, props=None):
- enforce(self._guid, 'Object needs to be posted first')
-
- to_fetch = []
- for prop in (props or self._reply):
- if prop not in self._props:
- to_fetch.append(prop)
- if not to_fetch:
- return
-
- response = Client.call('GET', mountpoint=self.mountpoint,
- document=self.document, guid=self._guid, reply=to_fetch)
- response.update(self._props)
- self._props = response
-
- def post(self):
- if not self._dirty:
- return
-
- props = {}
- for i in self._dirty:
- props[i] = self._props.get(i)
-
- if self._guid:
- Client.call('PUT', mountpoint=self.mountpoint,
- document=self.document, guid=self._guid, content=props,
- content_type='application/json')
- else:
- props['user'] = [sugar.uid()]
- self._guid = Client.call('POST', mountpoint=self.mountpoint,
- document=self.document, content=props,
- content_type='application/json')
-
- self._dirty.clear()
- return self._guid
-
- def get_blob_path(self, prop):
- blob, is_path = self._get_blob(prop)
- if is_path:
- return blob['path'], blob['mime_type']
- else:
- return None, None
-
- def get_blob(self, prop):
- blob, is_path = self._get_blob(prop)
- if is_path:
- path = blob['path']
- if path is None:
- return _empty_blob
- enforce(not isdir(path), 'Requested BLOB is a dictionary')
- return _Blob(path, blob['mime_type'])
- elif blob is not None:
- return _StringIO(blob.encode('utf8'))
- else:
- return _empty_blob
-
- def upload_blob(self, prop, path, pass_ownership=False):
- enforce(self._guid, 'Object needs to be posted first')
- Client.call('PUT', 'upload_blob', mountpoint=self.mountpoint,
- document=self.document, guid=self._guid, prop=prop,
- path=abspath(path), pass_ownership=pass_ownership)
-
- def _get_blob(self, prop):
- blob = self._blobs.get(prop)
- if blob is None:
- blob = Client.call('GET', 'get_blob', mountpoint=self.mountpoint,
- document=self.document, guid=self._guid, prop=prop)
- self._blobs[prop] = blob
- return blob, isinstance(blob, dict) and 'path' in blob
-
- def __getitem__(self, prop):
- result = self.get(prop)
- enforce(result is not None, KeyError,
- 'Property %r is absent in %r from %r mount',
- prop, self.document, self.mountpoint)
- return result
-
- def __setitem__(self, prop, value):
- enforce(prop != 'guid', 'Property "guid" is read-only')
- if self._props.get(prop) == value:
- return
- self._props[prop] = value
- self._dirty.add(prop)
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.post()
-
-
-class _Blob(file):
-
- def __init__(self, path, mime_type):
- file.__init__(self, path)
- self.mime_type = mime_type
-
-
-class _EmptyBlob(object):
-
- closed = True
- mime_type = 'application/octet-stream'
-
- def read(self, size=None):
- return ''
-
- def close(self):
- pass
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- pass
-
-
-class _StringIO(object):
-
- def __init__(self, *args, **kwargs):
- self._stream = StringIO(*args, **kwargs)
-
- def __getattr__(self, name):
- return getattr(self._stream, name)
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.close()
-
-
-_empty_blob = _EmptyBlob()
diff --git a/sugar_network/local/__init__.py b/sugar_network/local/__init__.py
index 36ed8df..e02cc1a 100644
--- a/sugar_network/local/__init__.py
+++ b/sugar_network/local/__init__.py
@@ -74,6 +74,10 @@ anonymous = Option(
'only read-only operations are available in this mode',
default=False, type_cast=Option.bool_cast, action='store_true')
+ipc_port = Option(
+ 'port number to listen for incomming connections from IPC clients',
+ default=5001, type_cast=int)
+
def path(*args):
"""Calculate a path from the root.
diff --git a/sugar_network/local/bus.py b/sugar_network/local/bus.py
deleted file mode 100644
index 5d57840..0000000
--- a/sugar_network/local/bus.py
+++ /dev/null
@@ -1,130 +0,0 @@
-# Copyright (C) 2012 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import json
-import errno
-import socket
-import logging
-from os.path import exists
-
-import active_document as ad
-from sugar_network import local
-from sugar_network.toolkit import ipc, sugar
-from active_toolkit import coroutine, sockets, util
-
-
-_logger = logging.getLogger('local.bus')
-
-
-class IPCServer(object):
-
- def __init__(self, mounts):
- self._subscriptions = []
- self._mounts = mounts
- self._publish_lock = coroutine.Lock()
- self._servers = coroutine.Pool()
-
- self._mounts.connect(self._republish)
-
- self._servers.spawn(
- _start_server('accept', self._serve_client))
- self._servers.spawn(
- _start_server('subscribe', self._serve_subscription))
-
- def serve_forever(self):
- # Clients write to rendezvous named pipe, in block mode,
- # to make sure that server is started
- rendezvous = ipc.rendezvous(server=True)
- try:
- self._servers.join()
- except KeyboardInterrupt:
- pass
- finally:
- os.close(rendezvous)
-
- def stop(self):
- while self._subscriptions:
- self._subscriptions.pop().close()
- self._servers.kill()
-
- def _serve_client(self, conn_file):
- while True:
- message = conn_file.read_message()
- if message is None:
- break
- try:
- request = ad.Request(message)
- request.principal = sugar.uid()
- request.access_level = ad.ACCESS_LOCAL
-
- content_type = request.pop('content_type')
- if content_type == 'application/json':
- request.content = json.loads(conn_file.read())
- elif content_type:
- request.content_stream = conn_file
- else:
- request.content = conn_file.read() or None
-
- response = ad.Response()
- result = self._mounts.call(request, response)
- conn_file.write_message(result)
-
- except Exception, error:
- conn_file.write_message({'error': str(error)})
-
- def _serve_subscription(self, conn_file):
- _logger.debug('Added new %r subscription', conn_file)
- self._subscriptions.append(conn_file)
- return True
-
- def _republish(self, event):
- _logger.debug('Send notification: %r', event)
-
- with self._publish_lock:
- for sock in self._subscriptions[:]:
- try:
- sock.write_message(event)
- except socket.error, error:
- if error.errno == errno.EPIPE:
- _logger.debug('Lost %r subscription', sock)
- self._subscriptions.remove(sock)
- else:
- util.exception(_logger,
- 'Failed to deliver event via %r', sock)
-
-
-def _start_server(name, serve_cb):
- accept_path = local.ensure_path('run', name)
- if exists(accept_path):
- os.unlink(accept_path)
-
- # pylint: disable-msg=E1101
- accept = coroutine.socket(socket.AF_UNIX)
- accept.bind(accept_path)
- accept.listen(5)
-
- def connection_cb(conn, address):
- conn_file = sockets.SocketFile(conn)
- _logger.debug('New %r connection: %r', name, conn_file)
- do_not_close = False
- try:
- do_not_close = serve_cb(conn_file)
- finally:
- _logger.debug('Quit %r connection: %r', name, conn_file)
- if not do_not_close:
- conn_file.close()
-
- return coroutine.Server(accept, connection_cb).serve_forever
diff --git a/sugar_network/client/dbus_client.py b/sugar_network/local/dbus_client.py
index a8682da..a8682da 100644
--- a/sugar_network/client/dbus_client.py
+++ b/sugar_network/local/dbus_client.py
diff --git a/sugar_network/local/ipc_client.py b/sugar_network/local/ipc_client.py
new file mode 100644
index 0000000..20a13aa
--- /dev/null
+++ b/sugar_network/local/ipc_client.py
@@ -0,0 +1,30 @@
+# Copyright (C) 2012 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import active_document as ad
+
+from sugar_network.toolkit import router
+from sugar_network.local import sugar
+
+
+class Router(router.Router):
+
+ def authenticate(self, request):
+ return sugar.uid()
+
+ def call(self, request, response):
+ request.access_level = ad.ACCESS_LOCAL
+ response.content_type = 'application/json'
+ return router.Router.call(self, request, response)
diff --git a/sugar_network/local/mounts.py b/sugar_network/local/mounts.py
index 747d3fa..d0297b5 100644
--- a/sugar_network/local/mounts.py
+++ b/sugar_network/local/mounts.py
@@ -17,7 +17,6 @@ import os
import json
import shutil
import logging
-from urlparse import urlparse
from os.path import isabs, exists, join, basename, isdir
from gettext import gettext as _
@@ -27,7 +26,7 @@ from sweets_recipe import Bundle
from sugar_network.toolkit import sugar, http
from sugar_network.local import activities, cache
from sugar_network import local, checkin, sugar
-from active_toolkit import sockets, util, coroutine, enforce
+from active_toolkit import util, coroutine, enforce
_LOCAL_PROPS = {
@@ -105,8 +104,7 @@ class LocalMount(ad.VolumeCommands, _Mount):
ad.VolumeCommands.before_create(self, request, props)
def _events_cb(self, event):
- if 'mountpoint' not in event:
- event['mountpoint'] = self.mountpoint
+ event['mountpoint'] = self.mountpoint
self.publish(event)
@@ -137,6 +135,7 @@ class HomeMount(LocalMount):
util.exception(_logger, 'Failed to read %r spec file', path)
continue
+ print '>', request.access_level == ad.ACCESS_LOCAL
if request.access_level == ad.ACCESS_LOCAL:
impl_id = spec.root
else:
@@ -295,6 +294,7 @@ class RemoteMount(ad.CommandsProcessor, _Mount, _ProxyCommands):
_Mount.__init__(self)
_ProxyCommands.__init__(self, home_volume)
+ self._client = None
self._seqno = 0
self._remote_volume_guid = None
self._api_urls = []
@@ -308,24 +308,7 @@ class RemoteMount(ad.CommandsProcessor, _Mount, _ProxyCommands):
try:
return ad.CommandsProcessor.call(self, request, response)
except ad.CommandNotFound:
- pass
-
- method = request.pop('method')
- document = request.pop('document')
- guid = request.pop('guid') if 'guid' in request else None
- prop = request.pop('prop') if 'prop' in request else None
-
- path = [document]
- if guid:
- path.append(guid)
- if prop:
- path.append(prop)
-
- return http.request(method, path, data=request.content,
- params=request, headers={
- 'Content-Type': 'application/json',
- 'Accept-Language': ','.join(request.accept_language),
- })
+ return self._client.call(request)
return self._proxy_call(request, response, super_call)
@@ -340,7 +323,7 @@ class RemoteMount(ad.CommandsProcessor, _Mount, _ProxyCommands):
def get_blob(self, document, guid, prop):
def download(path, seqno):
- return http.download([document, guid, prop], path, seqno,
+ return self._client.download([document, guid, prop], path, seqno,
document == 'implementation' and prop == 'data')
return cache.get_blob(document, guid, prop, self._seqno,
@@ -352,7 +335,8 @@ class RemoteMount(ad.CommandsProcessor, _Mount, _ProxyCommands):
try:
with file(path, 'rb') as f:
- http.request('PUT', [document, guid, prop], files={'file': f})
+ self._client.request('PUT', [document, guid, prop],
+ files={'file': f})
finally:
if pass_ownership and exists(path):
os.unlink(path)
@@ -365,49 +349,30 @@ class RemoteMount(ad.CommandsProcessor, _Mount, _ProxyCommands):
self._connections.spawn(self._connect)
def _connect(self):
-
- def connect(url):
- _logger.debug('Connecting to %r master', url)
- local.api_url.value = url
- subscription = http.request('POST', [''],
- params={'cmd': 'subscribe'},
- headers={'Content-Type': 'application/json'})
- conn = sockets.SocketFile(coroutine.socket())
- conn.connect((urlparse(url).hostname, subscription['port']))
- conn.write_message({'ticket': subscription['ticket']})
- return conn
-
- def listen_events(url, conn):
- stat = http.request('GET', [], params={'cmd': 'stat'},
- headers={'Content-Type': 'application/json'})
- # pylint: disable-msg=E1103
- self._seqno = stat.get('seqno') or 0
- self._remote_volume_guid = stat.get('guid')
-
- _logger.info('Connected to %r master', url)
- _Mount.set_mounted(self, True)
-
- while True:
- coroutine.select([conn.fileno()], [], [])
- event = conn.read_message()
- if event is None:
- break
-
- seqno = event.get('seqno')
- if seqno:
- self._seqno = seqno
-
- event['mountpoint'] = self.mountpoint
- self.publish(event)
-
for url in self._api_urls:
try:
- conn = connect(url)
+ _logger.debug('Connecting to %r master', url)
+ self._client = http.Client(url)
+ subscription = self._client.subscribe()
except Exception:
util.exception(_logger, 'Cannot connect to %r master', url)
continue
+
try:
- listen_events(url, conn)
+ stat = self._client.get(cmd='stat')
+ # pylint: disable-msg=E1103
+ self._seqno = stat.get('seqno') or 0
+ self._remote_volume_guid = stat.get('guid')
+
+ _logger.info('Connected to %r master', url)
+ _Mount.set_mounted(self, True)
+
+ for event in subscription:
+ seqno = event.get('seqno')
+ if seqno:
+ self._seqno = seqno
+ event['mountpoint'] = self.mountpoint
+ self.publish(event)
except Exception:
util.exception(_logger, 'Failed to dispatch remote event')
finally:
diff --git a/sugar_network/local/mountset.py b/sugar_network/local/mountset.py
index 175bf02..8fd63a1 100644
--- a/sugar_network/local/mountset.py
+++ b/sugar_network/local/mountset.py
@@ -21,12 +21,11 @@ import active_document as ad
from sugar_network import local, node
from sugar_network.toolkit import zeroconf, netlink, network, mounts_monitor
+from sugar_network.toolkit.router import Router
from sugar_network.local.mounts import LocalMount, NodeMount
-from sugar_network.node.subscribe_socket import SubscribeSocket
from sugar_network.node.commands import NodeCommands
from sugar_network.node.sync_node import SyncCommands
-from sugar_network.node.router import Router
-from sugar_network.resources.volume import Volume
+from sugar_network.resources.volume import Volume, Commands
from active_toolkit import util, coroutine, enforce
@@ -35,13 +34,9 @@ _DB_DIRNAME = '.sugar-network'
_logger = logging.getLogger('local.mountset')
-class Mountset(dict, ad.CommandsProcessor, SyncCommands):
+class Mountset(dict, ad.CommandsProcessor, Commands, SyncCommands):
def __init__(self, home_volume):
- dict.__init__(self)
- ad.CommandsProcessor.__init__(self)
- SyncCommands.__init__(self, local.path('sync'))
-
self.opened = coroutine.Event()
self.home_volume = home_volume
self._subscriptions = {}
@@ -49,6 +44,11 @@ class Mountset(dict, ad.CommandsProcessor, SyncCommands):
self._jobs = coroutine.Pool()
self._servers = coroutine.Pool()
+ dict.__init__(self)
+ ad.CommandsProcessor.__init__(self)
+ SyncCommands.__init__(self, local.path('sync'))
+ Commands.__init__(self)
+
def __getitem__(self, mountpoint):
enforce(mountpoint in self, 'Unknown mountpoint %r', mountpoint)
return self.get(mountpoint)
@@ -133,22 +133,18 @@ class Mountset(dict, ad.CommandsProcessor, SyncCommands):
return list(requires)
- @ad.volume_command(method='POST', cmd='publish')
- def republish(self, request):
- self.publish(request.content)
-
def call(self, request, response=None):
if response is None:
response = ad.Response()
request.accept_language = [self._lang]
- mountpoint = request.get('mountpoint')
+ mountpoint = request.get('mountpoint') or '/'
try:
try:
result = ad.CommandsProcessor.call(self, request, response)
except ad.CommandNotFound:
- enforce('mountpoint' in request, 'No \'mountpoint\' argument')
- request.pop('mountpoint')
+ if 'mountpoint' in request:
+ request.pop('mountpoint')
mount = self[mountpoint]
if mountpoint == '/':
mount.set_mounted(True)
@@ -254,16 +250,11 @@ class Mountset(dict, ad.CommandsProcessor, SyncCommands):
self._jobs.spawn(volume.populate)
if server_mode:
- subscriber = SubscribeSocket(volume,
- node.host.value, node.subscribe_port.value)
- cp = NodeCommands(volume, subscriber)
-
_logger.info('Start %r server on %s port',
volume.root, node.port.value)
server = coroutine.WSGIServer(('0.0.0.0', node.port.value),
- Router(cp))
+ Router(NodeCommands(volume)))
self._servers.spawn(server.serve_forever)
- self._servers.spawn(subscriber.serve_forever)
# Let servers start before publishing mount event
coroutine.dispatch()
diff --git a/sugar_network/node/__init__.py b/sugar_network/node/__init__.py
index 94499d2..e73e2d3 100644
--- a/sugar_network/node/__init__.py
+++ b/sugar_network/node/__init__.py
@@ -25,10 +25,6 @@ port = Option(
'port number to listen incomming connections',
default=8000, type_cast=int, name='port')
-subscribe_port = Option(
- 'port number to listen incomming subscribtion requests',
- default=8001, type_cast=int, name='subscribe_port')
-
keyfile = Option(
'path to SSL certificate keyfile to serve requests via HTTPS',
name='keyfile')
@@ -47,11 +43,6 @@ data_root = Option(
'path to a directory to place server data',
default='/var/lib/sugar-network', name='data_root')
-only_commit_events = Option(
- 'subscribers can be notified only with "commit" events; '
- 'that is useful to minimize interactions between server and clients',
- default=False, type_cast=Option.bool_cast, action='store_true')
-
find_limit = Option(
'limit the resulting list for search requests',
default=32, type_cast=int)
@@ -70,21 +61,3 @@ pull_timeout = Option(
'delay in seconds to return to sync-pull requester to wait until '
'pull request will be ready',
default=30, type_cast=int)
-
-
-class HTTPStatus(Exception):
-
- status = None
- headers = None
- result = None
-
-
-class BadRequest(HTTPStatus):
-
- status = '400 Bad Request'
-
-
-class Unauthorized(HTTPStatus):
-
- status = '401 Unauthorized'
- headers = {'WWW-Authenticate': 'Sugar'}
diff --git a/sugar_network/node/commands.py b/sugar_network/node/commands.py
index 74adad2..e8f2e03 100644
--- a/sugar_network/node/commands.py
+++ b/sugar_network/node/commands.py
@@ -21,6 +21,8 @@ from os.path import exists, join
import active_document as ad
from sugar_network import node
from sugar_network.node.sync_master import SyncCommands
+from sugar_network.resources.volume import Commands
+from sugar_network.toolkit import router
from active_toolkit import util, enforce
@@ -29,11 +31,11 @@ _DEFAULT_MASTER_GUID = 'api-testing.network.sugarlabs.org'
_logger = logging.getLogger('node.commands')
-class NodeCommands(ad.VolumeCommands):
+class NodeCommands(ad.VolumeCommands, Commands):
- def __init__(self, volume, subscriber=None):
+ def __init__(self, volume):
ad.VolumeCommands.__init__(self, volume)
- self._subscriber = subscriber
+ Commands.__init__(self)
self._is_master = False
node_path = join(volume.root, 'node')
@@ -62,6 +64,9 @@ class NodeCommands(ad.VolumeCommands):
if isinstance(prop, ad.BlobProperty):
self._blobs[document].add(prop.name)
+ def connect(self, callback, condition=None, **kwargs):
+ self.volume.connect(callback, condition)
+
@ad.volume_command(method='GET')
def hello(self, response):
response.content_type = 'text/html'
@@ -74,11 +79,6 @@ class NodeCommands(ad.VolumeCommands):
'seqno': self.volume.seqno.value,
}
- @ad.volume_command(method='POST', cmd='subscribe')
- def subscribe(self):
- enforce(self._subscriber is not None, 'Subscription is disabled')
- return self._subscriber.new_ticket()
-
@ad.document_command(method='DELETE',
permissions=ad.ACCESS_AUTH | ad.ACCESS_AUTHOR)
def delete(self, document, guid):
@@ -130,10 +130,11 @@ class NodeCommands(ad.VolumeCommands):
blobs = reply & self._blobs[document]
reply = list(reply - blobs)
- if not reply:
- reply = ['guid', 'layer']
- else:
- reply.append('layer')
+ if reply:
+ if 'layer' not in reply:
+ reply.append('layer')
+ if 'guid' not in reply:
+ reply.append('guid')
result = ad.VolumeCommands.get(self, document, guid, request, reply)
enforce('deleted' not in result['layer'], ad.NotFound,
@@ -150,7 +151,7 @@ class NodeCommands(ad.VolumeCommands):
return
if cmd.permissions & ad.ACCESS_AUTH:
- enforce(request.principal is not None, node.Unauthorized,
+ enforce(request.principal is not None, router.Unauthorized,
'User is not authenticated')
if cmd.permissions & ad.ACCESS_AUTHOR and 'guid' in request:
@@ -163,7 +164,6 @@ class NodeCommands(ad.VolumeCommands):
def before_create(self, request, props):
if request['document'] == 'user':
props['guid'], props['pubkey'] = _load_pubkey(props['pubkey'])
- props['user'] = [props['guid']]
else:
props['user'] = [request.principal]
self._set_author(props)
@@ -212,8 +212,8 @@ class NodeCommands(ad.VolumeCommands):
class MasterCommands(NodeCommands, SyncCommands):
- def __init__(self, volume, subscriber=None):
- NodeCommands.__init__(self, volume, subscriber)
+ def __init__(self, volume):
+ NodeCommands.__init__(self, volume)
SyncCommands.__init__(self)
diff --git a/sugar_network/node/subscribe_socket.py b/sugar_network/node/subscribe_socket.py
deleted file mode 100644
index 2a32aa0..0000000
--- a/sugar_network/node/subscribe_socket.py
+++ /dev/null
@@ -1,97 +0,0 @@
-# Copyright (C) 2012 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import socket
-import logging
-
-from sugar_network import node
-from active_toolkit import sockets, coroutine, util, enforce
-
-
-_logger = logging.getLogger('node.subscribe_socket')
-
-
-class SubscribeSocket(object):
-
- def __init__(self, volume, host, port):
- self._host = host
- self._port = port
- self._server = None
- self._tickets = set()
- self._subscribers = set()
-
- volume.connect(self.__signal_cb)
-
- def new_ticket(self):
- ticket = os.urandom(16).encode('hex')
- self._tickets.add(ticket)
- return {'host': self._host, 'port': self._port, 'ticket': ticket}
-
- def serve_forever(self):
- _logger.info('Listening for subscriptions on %s port', self._port)
-
- conn = coroutine.socket(socket.AF_INET, socket.SOCK_STREAM)
- # pylint: disable-msg=E1101
- conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- conn.bind((self._host, self._port))
- conn.listen(5)
-
- self._server = coroutine.Server(conn, self._serve_client)
- try:
- self._server.serve_forever()
- finally:
- self._server.stop()
- self._server = None
-
- def stop(self):
- if self._server is not None:
- self._server.stop()
-
- def _serve_client(self, conn, host):
- _logger.debug('Got request from %r, making a handshake', host)
-
- try:
- handshake = sockets.SocketFile(conn).read_message()
- ticket = handshake.get('ticket')
- enforce(ticket and ticket in self._tickets, 'Unknown request')
- self._tickets.remove(ticket)
- except Exception, error:
- _logger.warning('Handshake failed, discard the request: %s', error)
- return
-
- _logger.debug('Accepted %r subscriber', host)
- self._subscribers.add(conn)
- try:
- data = conn.recv(sockets.BUFFER_SIZE)
- enforce(not data, 'Subscriber misused connection '
- 'by sending %s bytes, discard it', len(data))
- except Exception:
- util.exception('Failed to handle subscription from %r', host)
- finally:
- _logger.debug('Close subscription from %r', host)
- self._subscribers.remove(conn)
-
- def __signal_cb(self, event):
- if node.only_commit_events.value:
- if event['event'] != 'commit':
- return
- else:
- if event['event'] == 'commit':
- # Subscribers already got update notifications enough
- return
-
- for conn in self._subscribers:
- sockets.SocketFile(conn).write_message(event)
diff --git a/sugar_network/resources/__init__.py b/sugar_network/resources/__init__.py
index 99eacc8..fd955e1 100644
--- a/sugar_network/resources/__init__.py
+++ b/sugar_network/resources/__init__.py
@@ -13,7 +13,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-CONTEXT_TYPES = ['activity', 'project']
+CONTEXT_TYPES = ['activity', 'project', 'package']
COMMENT_PARENTS = ['feedback', 'solution']
NOTIFICATION_TYPES = ['create', 'update', 'delete', 'vote']
FEEDBACK_TYPES = ['question', 'idea', 'problem', 'review']
diff --git a/sugar_network/resources/context.py b/sugar_network/resources/context.py
index 85c004b..5b7931f 100644
--- a/sugar_network/resources/context.py
+++ b/sugar_network/resources/context.py
@@ -103,3 +103,17 @@ class Context(Resource):
@ad.active_property(ad.StoredProperty, typecast=[int], default=(-1, -1))
def position(self, value):
return value
+
+ @classmethod
+ @ad.directory_command(method='PUT', cmd='include',
+ arguments={'layers': ad.to_list})
+ def include(cls, directory, layers, request):
+ import logging
+ logging.error('include> %r %r', layers, request.content)
+
+ @classmethod
+ @ad.directory_command(method='PUT', cmd='exclude',
+ arguments={'layers': ad.to_list})
+ def exclude(cls, directory, layers, request):
+ import logging
+ logging.error('exclude> %r %r', layers, request.content)
diff --git a/sugar_network/resources/user.py b/sugar_network/resources/user.py
index a120832..717888c 100644
--- a/sugar_network/resources/user.py
+++ b/sugar_network/resources/user.py
@@ -15,10 +15,15 @@
import active_document as ad
from sugar_network.node import stats
+from active_toolkit import enforce
class User(ad.Document):
+ @ad.active_property(prefix='L', typecast=[], default=['public'])
+ def layer(self, value):
+ return value
+
@ad.active_property(slot=1, prefix='N', full_text=True)
def name(self, value):
return value
@@ -43,7 +48,7 @@ class User(ad.Document):
def tags(self, value):
return value
- @ad.active_property(slot=5, prefix='L', full_text=True, default='')
+ @ad.active_property(slot=5, prefix='P', full_text=True, default='')
def location(self, value):
return value
@@ -51,9 +56,11 @@ class User(ad.Document):
def birthday(self, value):
return value
- @ad.document_command(method='GET', cmd='stats-info',
- permissions=ad.ACCESS_AUTHOR)
- def _stats_info(self):
+ @ad.document_command(method='GET', cmd='stats-info')
+ def _stats_info(self, request):
+ enforce(request.principal == self['guid'], ad.Forbidden,
+ 'Operation is permitted only for authors')
+
status = {}
rrd = stats.get_rrd(self.guid)
for name, __, last_update in rrd.dbs:
@@ -65,9 +72,11 @@ class User(ad.Document):
'status': status,
}
- @ad.document_command(method='POST', cmd='stats-upload',
- permissions=ad.ACCESS_AUTHOR)
+ @ad.document_command(method='POST', cmd='stats-upload')
def _stats_upload(self, request):
+ enforce(request.principal == self['guid'], ad.Forbidden,
+ 'Operation is permitted only for authors')
+
name = request.content['name']
values = request.content['values']
rrd = stats.get_rrd(self.guid)
diff --git a/sugar_network/resources/volume.py b/sugar_network/resources/volume.py
index 7344f5a..c6ac59b 100644
--- a/sugar_network/resources/volume.py
+++ b/sugar_network/resources/volume.py
@@ -13,6 +13,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
import logging
from os.path import join
@@ -29,8 +30,12 @@ _logger = logging.getLogger('resources.volume')
class Resource(ad.Document):
- @ad.active_property(prefix='RL', full_text=True, typecast=[],
- default=['public'])
+ @ad.active_property(prefix='RU', typecast=[], default=[],
+ permissions=ad.ACCESS_CREATE | ad.ACCESS_READ)
+ def user(self, value):
+ return value
+
+ @ad.active_property(prefix='RL', typecast=[], default=['public'])
def layer(self, value):
return value
@@ -78,7 +83,7 @@ class Volume(ad.SingleVolume):
return self[record['document']].merge(increment_seqno=increment_seqno,
**record)
- def diff(self, in_seq, out_packet, clone=False):
+ def diff(self, in_seq, out_packet):
# Since `in_seq` will be changed in `patch()`, original sequence
# should be passed as-is to every document's `diff()` because
# seqno handling is common for all documents
@@ -90,8 +95,7 @@ class Volume(ad.SingleVolume):
directory.commit()
def patch():
- for meta, data in directory.diff(orig_seq, limit=_DIFF_CHUNK,
- clone=clone):
+ for meta, data in directory.diff(orig_seq, limit=_DIFF_CHUNK):
coroutine.dispatch()
seqno = None
@@ -128,3 +132,45 @@ class Volume(ad.SingleVolume):
# this place, `push_seq` should contain not-collapsed sequence
orig_seq.floor(push_seq.last)
out_packet.push(force=True, cmd='sn_commit', sequence=orig_seq)
+
+
+class Commands(object):
+
+ def __init__(self):
+ self._notifier = coroutine.AsyncResult()
+ self.connect(lambda event: self._notify(event))
+
+ def connect(self, callback, condition=None, **kwargs):
+ raise NotImplementedError()
+
+ @ad.volume_command(method='GET', cmd='subscribe')
+ def subscribe(self, response, only_commits=False):
+ """Subscribe to Server-Sent Events.
+
+ :param only_commits:
+ subscribers can be notified only with "commit" events;
+ that is useful to minimize interactions between server and clients
+
+ """
+ response.content_type = 'text/event-stream'
+ response['Cache-Control'] = 'no-cache'
+ return self._pull_events(only_commits)
+
+ def _pull_events(self, only_commits):
+ while True:
+ event = self._notifier.get()
+
+ if only_commits:
+ if event['event'] != 'commit':
+ continue
+ else:
+ if event['event'] == 'commit':
+ # Subscribers already got update notifications enough
+ continue
+
+ yield 'data: %s\n\n' % json.dumps(event)
+
+ def _notify(self, event):
+ self._notifier.set(event)
+ self._notifier = coroutine.AsyncResult()
+ coroutine.dispatch()
diff --git a/sugar_network/toolkit/dbus_thread.py b/sugar_network/toolkit/dbus_thread.py
index b811a80..1cf2380 100644
--- a/sugar_network/toolkit/dbus_thread.py
+++ b/sugar_network/toolkit/dbus_thread.py
@@ -31,7 +31,7 @@ _services = []
_call_queue = coroutine.AsyncQueue()
-def start(mountset):
+def start(commands_processor):
gobject.threads_init()
threads_init()
@@ -44,11 +44,11 @@ def start(mountset):
for service in _services:
spawn(service.handle_event, event)
- mountset.connect(handle_event)
+ commands_processor.connect(handle_event)
try:
for request, reply_cb, error_cb, args in _call_queue:
try:
- reply = mountset.call(request)
+ reply = commands_processor.call(request)
except Exception, error:
util.exception(_logger, 'DBus %r request failed', request)
if error_cb is not None:
@@ -59,7 +59,7 @@ def start(mountset):
args += [reply]
spawn(reply_cb, *args)
finally:
- mountset.disconnect(handle_event)
+ commands_processor.disconnect(handle_event)
spawn(mainloop.quit)
thread.join()
diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py
index 3efee83..627103d 100644
--- a/sugar_network/toolkit/http.py
+++ b/sugar_network/toolkit/http.py
@@ -28,6 +28,7 @@ import requests
from requests.sessions import Session
from M2Crypto import DSA
+import active_document as ad
from sweets_recipe import Bundle
from active_toolkit.sockets import decode_multipart, BUFFER_SIZE
from sugar_network.toolkit import sugar
@@ -40,101 +41,22 @@ from gevent.monkey import patch_socket
patch_socket(dns=False)
-_logger = logging.getLogger('toolkit.http')
-_session = None
+_logger = logging.getLogger('http')
-def reset():
- global _session
- _session = None
+class Client(object):
+ def __init__(self, api_url, **kwargs):
+ self._api_url = api_url
+ self._params = kwargs
-def download(url_path, out_path, seqno=None, extract=False):
- if isdir(out_path):
- shutil.rmtree(out_path)
- elif not exists(dirname(out_path)):
- os.makedirs(dirname(out_path))
-
- params = {}
- if seqno:
- params['seqno'] = seqno
-
- response = _request('GET', url_path, allow_redirects=True,
- params=params, allowed_response=[404])
- if response.status_code != 200:
- return 'application/octet-stream'
-
- mime_type = response.headers.get('Content-Type') or \
- 'application/octet-stream'
-
- content_length = response.headers.get('Content-Length')
- content_length = int(content_length) if content_length else 0
- if seqno and not content_length:
- # Local cacheed versions is not stale
- return mime_type
-
- def fetch(f):
- _logger.debug('Download %r BLOB to %r', '/'.join(url_path), out_path)
- chunk_size = min(content_length, BUFFER_SIZE)
- empty = True
- for chunk in response.iter_content(chunk_size=chunk_size):
- empty = False
- f.write(chunk)
- return not empty
-
- def fetch_multipart(stream, size, boundary):
- stream.readline = None
- for filename, content in decode_multipart(stream, size, boundary):
- dst_path = join(out_path, filename)
- if not exists(dirname(dst_path)):
- os.makedirs(dirname(dst_path))
- shutil.move(content.name, dst_path)
-
- content_type, params = cgi.parse_header(mime_type)
- if content_type.split('/', 1)[0] == 'multipart':
- try:
- fetch_multipart(response.raw, content_length, params['boundary'])
- except Exception:
- shutil.rmtree(out_path, ignore_errors=True)
- raise
- elif extract:
- tmp_file = tempfile.NamedTemporaryFile(delete=False)
- try:
- if fetch(tmp_file):
- tmp_file.close()
- with Bundle(tmp_file.name, 'application/zip') as bundle:
- bundle.extractall(out_path)
- finally:
- if exists(tmp_file.name):
- os.unlink(tmp_file.name)
- else:
- with file(out_path, 'w') as f:
- if not fetch(f):
- os.unlink(out_path)
-
- return mime_type
-
-
-def request(method, path, data=None, headers=None, **kwargs):
- response = _request(method, path, data, headers, **kwargs)
- if response.headers.get('Content-Type') == 'application/json':
- return json.loads(response.content)
- else:
- return response
-
-
-def _request(method, path, data=None, headers=None, allowed_response=None,
- **kwargs):
- global _session
-
- if _session is None:
verify = True
if local.no_check_certificate.value:
verify = False
elif local.certfile.value:
verify = local.certfile.value
- headers = None
+ headers = {'Accept-Language': ','.join(ad.default_lang())}
if not local.anonymous.value:
uid = sugar.uid()
key_path = sugar.profile_path('owner.key')
@@ -143,61 +65,190 @@ def _request(method, path, data=None, headers=None, allowed_response=None,
'sugar_user_signature': _sign(key_path, uid),
}
- _session = Session(headers=headers, verify=verify, prefetch=False)
-
- if not path:
- path = ['']
- if not isinstance(path, basestring):
- path = '/'.join([i.strip('/') for i in [local.api_url.value] + path])
-
- if data is not None and headers and \
- headers.get('Content-Type') == 'application/json':
- data = json.dumps(data)
-
- while True:
- try:
- response = requests.request(method, path, data=data,
- headers=headers, session=_session, **kwargs)
- except requests.exceptions.SSLError:
- _logger.warning('Pass --no-check-certificate to avoid SSL checks')
- raise
-
+ self._session = Session(headers=headers, verify=verify, prefetch=False)
+
+ def get(self, path_=None, **kwargs):
+ kwargs.update(self._params)
+ return self.request('GET', path_, params=kwargs)
+
+ def post(self, path_=None, data_=None, **kwargs):
+ kwargs.update(self._params)
+ return self.request('POST', path_, data_,
+ headers={'Content-Type': 'application/json'}, params=kwargs)
+
+ def put(self, path_=None, data_=None, **kwargs):
+ kwargs.update(self._params)
+ return self.request('PUT', path_, data_,
+ headers={'Content-Type': 'application/json'}, params=kwargs)
+
+ def delete(self, path_=None, **kwargs):
+ kwargs.update(self._params)
+ return self.request('DELETE', path_, params=kwargs)
+
+ def request(self, method, path=None, data=None, headers=None, **kwargs):
+ response = self._request(method, path, data, headers, **kwargs)
+ if response.headers.get('Content-Type') == 'application/json':
+ return json.loads(response.content)
+ else:
+ return response
+
+ def call(self, request):
+ method = request.pop('method')
+ document = request.pop('document')
+ guid = request.pop('guid') if 'guid' in request else None
+ prop = request.pop('prop') if 'prop' in request else None
+
+ path = [document]
+ if guid:
+ path.append(guid)
+ if prop:
+ path.append(prop)
+
+ return self.request(method, path, data=request.content, params=request,
+ headers={'Content-Type': 'application/json'})
+
+ def download(self, url_path, out_path, seqno=None, extract=False):
+ if isdir(out_path):
+ shutil.rmtree(out_path)
+ elif not exists(dirname(out_path)):
+ os.makedirs(dirname(out_path))
+
+ params = {}
+ if seqno:
+ params['seqno'] = seqno
+
+ response = self._request('GET', url_path, allow_redirects=True,
+ params=params, allowed=[404])
if response.status_code != 200:
- if response.status_code == 401:
- enforce(not local.anonymous.value,
- 'Operation is not available in anonymous mode')
- _logger.info('User is not registered on the server, '
- 'registering')
- _register()
- continue
- if allowed_response and response.status_code in allowed_response:
- return response
- content = response.content
+ return 'application/octet-stream'
+
+ mime_type = response.headers.get('Content-Type') or \
+ 'application/octet-stream'
+
+ content_length = response.headers.get('Content-Length')
+ content_length = int(content_length) if content_length else 0
+ if seqno and not content_length:
+ # Local cacheed versions is not stale
+ return mime_type
+
+ def fetch(f):
+ _logger.debug('Download %r BLOB to %r',
+ '/'.join(url_path), out_path)
+ chunk_size = min(content_length, BUFFER_SIZE)
+ empty = True
+ for chunk in response.iter_content(chunk_size=chunk_size):
+ empty = False
+ f.write(chunk)
+ return not empty
+
+ def fetch_multipart(stream, size, boundary):
+ stream.readline = None
+ for filename, content in decode_multipart(stream, size, boundary):
+ dst_path = join(out_path, filename)
+ if not exists(dirname(dst_path)):
+ os.makedirs(dirname(dst_path))
+ shutil.move(content.name, dst_path)
+
+ content_type, params = cgi.parse_header(mime_type)
+ if content_type.split('/', 1)[0] == 'multipart':
try:
- error = json.loads(content)
+ fetch_multipart(response.raw, content_length,
+ params['boundary'])
except Exception:
- _logger.debug('Got %s HTTP error for %r request:\n%s',
- response.status_code, path, content)
- response.raise_for_status()
- else:
- raise RuntimeError(error['error'])
+ shutil.rmtree(out_path, ignore_errors=True)
+ raise
+ elif extract:
+ tmp_file = tempfile.NamedTemporaryFile(delete=False)
+ try:
+ if fetch(tmp_file):
+ tmp_file.close()
+ with Bundle(tmp_file.name, 'application/zip') as bundle:
+ bundle.extractall(out_path)
+ finally:
+ if exists(tmp_file.name):
+ os.unlink(tmp_file.name)
+ else:
+ with file(out_path, 'w') as f:
+ if not fetch(f):
+ os.unlink(out_path)
- return response
+ return mime_type
+ def subscribe(self):
+ response = self.request('GET', params={'cmd': 'subscribe'})
+ for line in _readlines(response.raw):
+ if line.startswith('data: '):
+ yield json.loads(line.split(' ', 1)[1])
-def _register():
- _request('POST', ['user'],
- headers={'Content-Type': 'application/json'},
- data={
- 'name': sugar.nickname() or '',
- 'color': sugar.color() or '#000000,#000000',
- 'machine_sn': sugar.machine_sn() or '',
- 'machine_uuid': sugar.machine_uuid() or '',
- 'pubkey': sugar.pubkey(),
- },
- )
+ def _request(self, method, path, data=None, headers=None, allowed=None,
+ **kwargs):
+ if not path:
+ path = ['']
+ if not isinstance(path, basestring):
+ path = '/'.join([i.strip('/') for i in [self._api_url] + path])
+
+ if data is not None and headers and \
+ headers.get('Content-Type') == 'application/json':
+ data = json.dumps(data)
+
+ while True:
+ try:
+ response = requests.request(method, path, data=data,
+ headers=headers, session=self._session, **kwargs)
+ except requests.exceptions.SSLError:
+ _logger.warning('Use --no-check-certificate to avoid checks')
+ raise
+
+ if response.status_code != 200:
+ if response.status_code == 401:
+ enforce(not local.anonymous.value,
+ 'Operation is not available in anonymous mode')
+ _logger.info('User is not registered on the server, '
+ 'registering')
+ self._register()
+ continue
+ if allowed and response.status_code in allowed:
+ return response
+ content = response.content
+ try:
+ error = json.loads(content)
+ except Exception:
+ _logger.debug('Got %s HTTP error for %r request:\n%s',
+ response.status_code, path, content)
+ response.raise_for_status()
+ else:
+ raise RuntimeError(error['error'])
+
+ return response
+
+ def _register(self):
+ self._request('POST', ['user'],
+ headers={
+ 'Content-Type': 'application/json',
+ },
+ data={
+ 'name': sugar.nickname() or '',
+ 'color': sugar.color() or '#000000,#000000',
+ 'machine_sn': sugar.machine_sn() or '',
+ 'machine_uuid': sugar.machine_uuid() or '',
+ 'pubkey': sugar.pubkey(),
+ },
+ )
def _sign(key_path, data):
key = DSA.load_key(key_path)
return key.sign_asn1(hashlib.sha1(data).digest()).encode('hex')
+
+
+def _readlines(stream):
+ line = ''
+ while True:
+ char = stream.read(1)
+ if not char:
+ break
+ if char == '\n':
+ yield line
+ line = ''
+ else:
+ line += char
diff --git a/sugar_network/toolkit/ipc.py b/sugar_network/toolkit/ipc.py
deleted file mode 100644
index 3e9471e..0000000
--- a/sugar_network/toolkit/ipc.py
+++ /dev/null
@@ -1,76 +0,0 @@
-# Copyright (C) 2012 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import errno
-import logging
-
-from sugar_network import local
-
-
-_logger = logging.getLogger('toolkit.ipc')
-
-
-def rendezvous(server=False):
- """Rendezvous barrier to synchronize one server and multiple clients.
-
- :param server:
- if caller is a server
- :returns:
- if `server` is `True`, file descriptor that should be closed
- on server shutting down
-
- """
- rendezvous_path = local.ensure_path('run', 'rendezvous')
-
- try:
- os.mkfifo(rendezvous_path)
- except OSError, error:
- if error.errno != errno.EEXIST:
- raise
-
- if server:
- _logger.info('Start accepting clients on %r', rendezvous_path)
- return os.open(rendezvous_path, os.O_RDONLY | os.O_NONBLOCK)
- else:
- _logger.debug('Connecting to %r IPC server', rendezvous_path)
- # Will be blocked until server will call `rendezvous(server=True)`
- fd = os.open(rendezvous_path, os.O_WRONLY)
- _logger.info('Connected successfully')
- # No need in fd any more
- os.close(fd)
-
-
-def server_exists(server=False):
- """The same as `rendezvous()` but for client side and without waiting."""
- rendezvous_path = local.ensure_path('run', 'rendezvous')
-
- try:
- os.mkfifo(rendezvous_path)
- except OSError, error:
- if error.errno != errno.EEXIST:
- raise
-
- try:
- fd = os.open(rendezvous_path, os.O_WRONLY | os.O_NONBLOCK)
- except OSError, error:
- if error.errno == errno.ENXIO:
- _logger.debug('No server launched')
- return False
-
- _logger.info('Connected successfully')
- # No need in fd any more
- os.close(fd)
- return True
diff --git a/sugar_network/node/router.py b/sugar_network/toolkit/router.py
index ee1fa1c..d02dd28 100644
--- a/sugar_network/node/router.py
+++ b/sugar_network/toolkit/router.py
@@ -23,12 +23,30 @@ from bisect import bisect_left
from os.path import join, isfile
import active_document as ad
-from sugar_network import node, static
+from sugar_network import static
from active_toolkit.sockets import BUFFER_SIZE
from active_toolkit import util, enforce
-_logger = logging.getLogger('node.router')
+_logger = logging.getLogger('router')
+
+
+class HTTPStatus(Exception):
+
+ status = None
+ headers = None
+ result = None
+
+
+class BadRequest(HTTPStatus):
+
+ status = '400 Bad Request'
+
+
+class Unauthorized(HTTPStatus):
+
+ status = '401 Unauthorized'
+ headers = {'WWW-Authenticate': 'Sugar'}
class Router(object):
@@ -41,6 +59,61 @@ class Router(object):
# Otherwise ssh-keygen will popup auth dialogs on registeration
del os.environ['SSH_ASKPASS']
+ def authenticate(self, request):
+ user = request.environ.get('HTTP_SUGAR_USER')
+ if user is None:
+ return None
+
+ if user not in self._authenticated and \
+ (request.path != ['user'] or request['method'] != 'POST'):
+ _logger.debug('Logging %r user', user)
+ request = ad.Request(method='GET', cmd='exists',
+ document='user', guid=user)
+ enforce(self._cp.call(request, ad.Response()), Unauthorized,
+ 'Principal user does not exist')
+ self._authenticated.add(user)
+
+ return user
+
+ def call(self, request, response):
+ if 'HTTP_ORIGIN' in request.environ:
+ enforce(request.environ['HTTP_ORIGIN'] == 'null', ad.Forbidden,
+ 'Cross-site is allowed only for local applications')
+ response['Access-Control-Allow-Origin'] = \
+ request.environ['HTTP_ORIGIN']
+
+ if request['method'] == 'OPTIONS':
+ # TODO Process OPTIONS request per url?
+ if request.environ['HTTP_ORIGIN']:
+ response['Access-Control-Allow-Methods'] = \
+ request.environ['HTTP_ACCESS_CONTROL_REQUEST_METHOD']
+ response['Access-Control-Allow-Headers'] = \
+ request.environ['HTTP_ACCESS_CONTROL_REQUEST_HEADERS']
+ else:
+ response['Allow'] = 'GET, POST, PUT, DELETE'
+ response.content_length = 0
+ return None
+
+ request.principal = self.authenticate(request)
+ if request.path[:1] == ['static']:
+ static_path = join(static.PATH, *request.path[1:])
+ enforce(isfile(static_path), 'No such file')
+ result = file(static_path)
+ else:
+ result = self._cp.call(request, response)
+
+ if hasattr(result, 'read'):
+ # pylint: disable-msg=E1103
+ if hasattr(result, 'fileno'):
+ response.content_length = os.fstat(result.fileno()).st_size
+ elif hasattr(result, 'seek'):
+ result.seek(0, 2)
+ response.content_length = result.tell()
+ result.seek(0)
+ result = _stream_reader(result)
+
+ return result
+
def __call__(self, environ, start_response):
request = _Request(environ)
response = _Response()
@@ -51,7 +124,7 @@ class Router(object):
result = None
try:
- result = self._call(request, response)
+ result = self.call(request, response)
except ad.Redirect, error:
response.status = '303 See Other'
response['Location'] = error.location
@@ -64,7 +137,7 @@ class Router(object):
response.status = '404 Not Found'
elif isinstance(error, ad.Forbidden):
response.status = '403 Forbidden'
- elif isinstance(error, node.HTTPStatus):
+ elif isinstance(error, HTTPStatus):
response.status = error.status
response.update(error.headers or {})
result = error.result
@@ -101,61 +174,6 @@ class Router(object):
elif result is not None:
yield result
- def _call(self, request, response):
- if 'HTTP_ORIGIN' in request.environ:
- enforce(request.environ['HTTP_ORIGIN'] == 'null', ad.Forbidden,
- 'Cross-site is allowed only for local applications')
- response['Access-Control-Allow-Origin'] = \
- request.environ['HTTP_ORIGIN']
-
- if request['method'] == "OPTIONS":
- # TODO Process OPTIONS request per url?
- if request.environ['HTTP_ORIGIN']:
- response['Access-Control-Allow-Methods'] = \
- request.environ['HTTP_ACCESS_CONTROL_REQUEST_METHOD']
- response['Access-Control-Allow-Headers'] = \
- request.environ['HTTP_ACCESS_CONTROL_REQUEST_HEADERS']
- else:
- response['Allow'] = "GET, POST, PUT, DELETE"
- response.content_length = 0
- return None
-
- request.principal = self._authenticate(request)
- if request.path[:1] == ['static']:
- static_path = join(static.PATH, *request.path[1:])
- enforce(isfile(static_path), 'No such file')
- result = file(static_path)
- else:
- result = self._cp.call(request, response)
-
- if hasattr(result, 'read'):
- # pylint: disable-msg=E1103
- if hasattr(result, 'fileno'):
- response.content_length = os.fstat(result.fileno()).st_size
- elif hasattr(result, 'seek'):
- result.seek(0, 2)
- response.content_length = result.tell()
- result.seek(0)
- result = _stream_reader(result)
-
- return result
-
- def _authenticate(self, request):
- user = request.environ.get('HTTP_SUGAR_USER')
- if user is None:
- return None
-
- if user not in self._authenticated and \
- (request.path != ['user'] or request['method'] != 'POST'):
- _logger.debug('Logging %r user', user)
- request = ad.Request(method='GET', cmd='exists',
- document='user', guid=user)
- enforce(self._cp.call(request, ad.Response()), node.Unauthorized,
- 'Principal user does not exist')
- self._authenticated.add(user)
-
- return user
-
class _Request(ad.Request):
@@ -202,7 +220,7 @@ class _Request(ad.Request):
self.content_stream = files.list[0].file
scope = len(self.path)
- enforce(scope >= 0 and scope < 4, node.BadRequest,
+ enforce(scope >= 0 and scope < 4, BadRequest,
'Incorrect requested path')
if scope == 3:
self['document'], self['guid'], self['prop'] = self.path
diff --git a/sugar_network/zerosugar/feeds.py b/sugar_network/zerosugar/feeds.py
index 26a4eb8..a1542b6 100644
--- a/sugar_network/zerosugar/feeds.py
+++ b/sugar_network/zerosugar/feeds.py
@@ -34,8 +34,9 @@ def read(context):
client = None
for client in config.clients:
try:
- with client.Context(context).get_blob('feed') as f:
- enforce(not f.closed, 'No feed for %r context', context)
+ blob = client.get(['context', context, 'feed'], cmd='get_blob')
+ enforce(blob and 'path' in blob, 'No feed for %r context', context)
+ with file(blob['path']) as f:
feed_content = json.load(f)
if feed_content:
break
diff --git a/sugar_network/zerosugar/injector.py b/sugar_network/zerosugar/injector.py
index af80321..2189ab8 100644
--- a/sugar_network/zerosugar/injector.py
+++ b/sugar_network/zerosugar/injector.py
@@ -30,7 +30,7 @@ from zeroinstall.injector.requirements import Requirements
from sweets_recipe import Spec
from sugar_network.zerosugar import solver
from sugar_network.zerosugar.config import config
-from sugar_network import local, Client
+from sugar_network import local
from sugar_network.toolkit import sugar
from active_toolkit import coroutine, util, enforce
@@ -129,20 +129,20 @@ def _fork(callback, mountpoint, context, *args):
os.close(fd_w)
return Pipe(pid, fd_r)
+ from sugar_network import IPCClient
+
os.close(fd_r)
global _pipe
_pipe = fd_w
- Client.close()
-
def thread_func():
log_path = _setup_logging(context)
_progress('stat', log_path=log_path, mountpoint=mountpoint,
context=context)
- config.clients = [Client('~')]
+ config.clients = [IPCClient(mountpoint='~')]
if mountpoint != '~':
- config.clients.append(Client(mountpoint))
+ config.clients.append(IPCClient(mountpoint=mountpoint))
try:
callback(mountpoint, context, *args)
@@ -150,7 +150,7 @@ def _fork(callback, mountpoint, context, *args):
util.exception(_logger)
_progress('failure', error=str(error))
- # To avoid execution current thread coroutine
+ # Avoid a mess with current thread coroutines
thread = threading.Thread(target=thread_func)
thread.start()
thread.join()
@@ -219,9 +219,10 @@ def _make(context, command):
# TODO Per download progress
_progress('download', progress=-1)
- impl = sel.client.Implementation(sel.id)
- impl_path, __ = impl.get_blob_path('data')
- enforce(impl_path, 'Cannot download implementation')
+ impl = sel.client.get(['implementation', sel.id, 'data'],
+ cmd='get_blob')
+ enforce(impl and 'path' in impl, 'Cannot download implementation')
+ impl_path = impl['path']
dl = sel.download_sources[0]
if dl.extract is not None:
diff --git a/tests/__init__.py b/tests/__init__.py
index 2c455c0..18a2750 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -17,17 +17,15 @@ from M2Crypto import DSA
import active_document as ad
from active_toolkit import coroutine
-from sugar_network.client import bus
from sugar_network.toolkit import sugar, http, sneakernet, mounts_monitor
-from sugar_network.local.bus import IPCServer
+from sugar_network.toolkit.router import Router
+from sugar_network.local.ipc_client import Router as IPCRouter
from sugar_network.local.mounts import HomeMount, RemoteMount
from sugar_network.local.mountset import Mountset
from sugar_network import local, node
from sugar_network.resources.user import User
from sugar_network.resources.context import Context
-from sugar_network.node.router import Router
from sugar_network.node.commands import NodeCommands
-from sugar_network.node.subscribe_socket import SubscribeSocket
from sugar_network.node import stats
from sugar_network.resources.volume import Volume
@@ -74,7 +72,6 @@ class Test(unittest.TestCase):
ad.index_flush_threshold.value = 1
node.find_limit.value = 1024
node.tmpdir.value = tmpdir + '/tmp'
- node.only_commit_events.value = False
node.data_root.value = tmpdir
node.sync_dirs.value = []
ad.index_write_queue.value = 10
@@ -83,6 +80,7 @@ class Test(unittest.TestCase):
local.api_url.value = 'http://localhost:8800'
local.server_mode.value = False
local.mounts_root.value = None
+ local.ipc_port.value = 5101
mounts_monitor.stop()
mounts_monitor._COMPLETE_MOUNT_TIMEOUT = .1
stats.stats_root.value = tmpdir + '/stats'
@@ -107,11 +105,6 @@ class Test(unittest.TestCase):
self._logfile = file(self.logfile + '.out', 'a')
sys.stdout = sys.stderr = self._logfile
- bus._CONNECTION_POOL = 1
- bus.Client.close()
-
- http.reset()
-
for handler in logging.getLogger().handlers:
logging.getLogger().removeHandler(handler)
logging.basicConfig(level=logging.DEBUG, filename=self.logfile)
@@ -132,7 +125,6 @@ class Test(unittest.TestCase):
def stop_servers(self):
if self.mounts is not None:
self.mounts.close()
- bus.Client.close()
if self.server is not None:
self.server.stop()
while self.forks:
@@ -195,7 +187,7 @@ class Test(unittest.TestCase):
pid = os.fork()
if pid:
self.forks.append(pid)
- coroutine.sleep(1)
+ coroutine.sleep(2)
return pid
self.fork_num += 1
@@ -231,7 +223,8 @@ class Test(unittest.TestCase):
self.mounts['~'] = HomeMount(volume)
if root:
self.mounts['/'] = RemoteMount(volume)
- self.server = IPCServer(self.mounts)
+ self.server = coroutine.WSGIServer(
+ ('localhost', local.ipc_port.value), IPCRouter(self.mounts))
coroutine.spawn(self.server.serve_forever)
self.mounts.open()
self.mounts.opened.wait()
@@ -263,17 +256,14 @@ class Test(unittest.TestCase):
ad.index_write_queue.value = 10
volume = Volume('remote', classes or [User, Context])
- subscriber = SubscribeSocket(volume, 'localhost', 8801)
- cp = NodeCommands(volume, subscriber)
+ cp = NodeCommands(volume)
httpd = coroutine.WSGIServer(('localhost', 8800), Router(cp))
try:
coroutine.joinall([
coroutine.spawn(httpd.serve_forever),
- coroutine.spawn(subscriber.serve_forever),
])
finally:
httpd.stop()
- subscriber.stop()
volume.close()
diff --git a/tests/units/__main__.py b/tests/units/__main__.py
index 23c08f8..0eac03b 100644
--- a/tests/units/__main__.py
+++ b/tests/units/__main__.py
@@ -3,26 +3,23 @@
from __init__ import tests
from collection import *
-from sneakernet import *
-from bus import *
-from activities import *
-from ipc_client import *
-from injector import *
-from subscribe_socket import *
-from router import *
from volume import *
from local import *
from node import *
-from mountset import *
-from home_mount import *
-from remote_mount import *
-from node_mount import *
-from sync_master import *
-from sync_node import *
+from dbus_client import *
from datastore import *
from dbus_datastore import *
+from sneakernet import *
+from router import *
from files_sync import *
-from dbus_client import *
+from sync_node import *
+from sync_master import *
from mounts_monitor import *
+from activities import *
+from home_mount import *
+from remote_mount import *
+from node_mount import *
+from injector import *
+from mountset import *
tests.main()
diff --git a/tests/units/bus.py b/tests/units/bus.py
deleted file mode 100755
index c38a69a..0000000
--- a/tests/units/bus.py
+++ /dev/null
@@ -1,79 +0,0 @@
-#!/usr/bin/env python
-# sugar-lint: disable
-
-import time
-import json
-from os.path import join
-
-from __init__ import tests
-
-import active_document as ad
-from active_toolkit import coroutine
-from sugar_network.client import bus
-from sugar_network import Client, ServerError
-from sugar_network.local.bus import IPCServer
-from sugar_network.local.mountset import Mountset
-from sugar_network.resources.volume import Volume
-
-
-class BusTest(tests.Test):
-
- def test_Rendezvous(self):
-
- def server():
- time.sleep(1)
- volume = Volume('local', [])
- mounts = Mountset(volume)
- server = IPCServer(mounts)
- mounts.call = lambda *args: None
- server.serve_forever()
-
- ts = time.time()
- self.fork(server)
-
- client = Client('/')
- client.Context.delete('guid')
- assert time.time() - ts >= 1
-
- def test_Exception(self):
- self.start_server()
-
- def call_handle(request, response):
- raise RuntimeError()
-
- self.mounts.call = call_handle
-
- client = Client('/')
- self.assertRaises(ServerError, client.Resource.delete, 'fake')
-
- def test_ConsecutiveRequests(self):
- self.start_server()
- calls = []
- self.mounts.call = lambda request, response: calls.append(dict(request))
-
- bus._CONNECTION_POOL = 3
-
- def caller(client, i, n):
- getattr(client, 'Resource%s' % i).delete('wait%s%s' % (i, n))
-
- ts = time.time()
- clients = [Client('/'), Client('/'), Client('/')]
- call_jobs = []
- for i, client in enumerate(clients):
- for n in range(9):
- call_jobs.append(coroutine.spawn(caller, client, i, n))
-
- coroutine.joinall(call_jobs)
- assert time.time() - ts < 4
-
- standard = []
- for i, client in enumerate(clients):
- for n in range(9):
- standard.append({'method': 'DELETE', 'mountpoint': '/', 'guid': 'wait%s%s' % (i, n), 'document': 'resource%s' % i})
- self.assertEqual(
- sorted(standard),
- sorted(calls))
-
-
-if __name__ == '__main__':
- tests.main()
diff --git a/tests/units/dbus_client.py b/tests/units/dbus_client.py
index 04b2307..3f5c1ef 100755
--- a/tests/units/dbus_client.py
+++ b/tests/units/dbus_client.py
@@ -5,7 +5,7 @@ import os
import sys
from os.path import abspath
-arg0 = abspath(__file__)
+arg0 = abspath(__file__).replace('.pyc', '.py')
import dbus
import gobject
@@ -33,7 +33,7 @@ class DbusClientTest(tests.Test):
return
self.fork(os.execvp, arg0, [arg0, self.id().split('.')[-1], 'fork'])
- coroutine.sleep(3)
+ coroutine.sleep(1)
def test_Call(self):
client = DBusClient(mountpoint='~')
diff --git a/tests/units/dbus_datastore.py b/tests/units/dbus_datastore.py
index f95059a..72398ad 100755
--- a/tests/units/dbus_datastore.py
+++ b/tests/units/dbus_datastore.py
@@ -6,7 +6,7 @@ import sys
import time
from os.path import exists, abspath, dirname
-arg0 = abspath(__file__)
+arg0 = abspath(__file__).replace('.pyc', '.py')
import dbus
import gobject
@@ -88,7 +88,7 @@ class DbusDatastoreTest(tests.Test):
'activity_id': 'activity_id',
'filesize': '0',
'creation_time': '1',
- 'timestamp': '1',
+ 'timestamp': '-1',
'mtime': '1970-01-01T00:00:01',
'tags': 'tags',
},
diff --git a/tests/units/home_mount.py b/tests/units/home_mount.py
index c9c9119..c0cfaae 100755
--- a/tests/units/home_mount.py
+++ b/tests/units/home_mount.py
@@ -3,29 +3,30 @@
import os
import socket
-from os.path import exists
+from os.path import exists, abspath
from __init__ import tests
from active_toolkit import sockets, coroutine
-from sugar_network import Client
from sugar_network.resources.report import Report
+from sugar_network import IPCClient
class HomeMountTest(tests.Test):
def test_create(self):
self.start_server()
- local = Client('~')
-
- guid = local.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
+ local = IPCClient(mountpoint='~')
+
+ guid = local.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
self.assertNotEqual(None, guid)
- res = local.Context(guid, ['title', 'keep', 'keep_impl', 'position'])
+ res = local.get(['context', guid], reply=['guid', 'title', 'keep', 'keep_impl', 'position'])
self.assertEqual(guid, res['guid'])
self.assertEqual('title', res['title'])
self.assertEqual(False, res['keep'])
@@ -34,253 +35,141 @@ class HomeMountTest(tests.Test):
def test_update(self):
self.start_server()
- local = Client('~')
-
- guid = local.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
-
- context = local.Context(guid)
- context['title'] = 'title_2'
- context['keep'] = True
- context['position'] = (2, 3)
- context.post()
-
- context = local.Context(guid, ['title', 'keep', 'position'])
+ local = IPCClient(mountpoint='~')
+
+ guid = local.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
+
+ local.put(['context', guid], {
+ 'title': 'title_2',
+ 'keep': True,
+ 'position': (2, 3),
+ })
+
+ context = local.get(['context', guid], reply=['title', 'keep', 'position'])
self.assertEqual('title_2', context['title'])
self.assertEqual(True, context['keep'])
self.assertEqual([2, 3], context['position'])
- def test_get(self):
- self.start_server()
- local = Client('~')
-
- guid = local.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
-
- context = local.Context(guid, ['title', 'keep', 'keep_impl', 'position'])
- self.assertEqual(guid, context['guid'])
- self.assertEqual('title', context['title'])
- self.assertEqual(False, context['keep'])
- self.assertEqual(0, context['keep_impl'])
- self.assertEqual([-1, -1], context['position'])
-
def test_find(self):
self.start_server()
- local = Client('~')
-
- guid_1 = local.Context(
- type='activity',
- title='title_1',
- summary='summary',
- description='description').post()
- guid_2 = local.Context(
- type='activity',
- title='title_2',
- summary='summary',
- description='description').post()
- guid_3 = local.Context(
- type='activity',
- title='title_3',
- summary='summary',
- description='description').post()
-
- cursor = local.Context.cursor(reply=['guid', 'title', 'keep', 'keep_impl', 'position'])
- self.assertEqual(3, cursor.total)
+ local = IPCClient(mountpoint='~')
+
+ guid_1 = local.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title_1',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
+ guid_2 = local.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title_2',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
+ guid_3 = local.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title_3',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
+
+ cursor = local.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl', 'position'])
+ self.assertEqual(3, cursor['total'])
self.assertEqual(
sorted([
(guid_1, 'title_1', False, 0, [-1, -1]),
(guid_2, 'title_2', False, 0, [-1, -1]),
(guid_3, 'title_3', False, 0, [-1, -1]),
]),
- sorted([(i['guid'], i['title'], i['keep'], i['keep_impl'], i['position']) for i in cursor]))
+ sorted([(i['guid'], i['title'], i['keep'], i['keep_impl'], i['position']) for i in cursor['result']]))
def test_upload_blob(self):
self.start_server()
- local = Client('~')
+ local = IPCClient(mountpoint='~')
- guid = local.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
+ guid = local.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
self.touch(('file', 'blob'))
- local.Context(guid).upload_blob('preview', 'file')
- self.assertEqual('blob', local.Context(guid).get_blob('preview').read())
+ local.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file'))
+ blob = local.get(['context', guid, 'preview'], cmd='get_blob')
+ self.assertEqual('blob', file(blob['path']).read())
self.touch(('file2', 'blob2'))
- local.Context(guid).upload_blob('preview', 'file2', pass_ownership=True)
- self.assertEqual('blob2', local.Context(guid).get_blob('preview').read())
+ local.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file2'), pass_ownership=True)
+ blob = local.get(['context', guid, 'preview'], cmd='get_blob')
+ self.assertEqual('blob2', file(blob['path']).read())
assert not exists('file2')
def test_GetAbsetnBLOB(self):
self.start_server([Report])
- client = Client('~')
+ local = IPCClient(mountpoint='~')
- guid = client.Report(
- context='context',
- implementation='implementation',
- description='description').post()
+ guid = local.post(['report'], {
+ 'context': 'context',
+ 'implementation': 'implementation',
+ 'description': 'description',
+ })
- path, mime_type = client.Report(guid).get_blob_path('data')
- self.assertEqual(None, path)
- self.assertEqual(True, client.Report(guid).get_blob('data').closed)
+ self.assertEqual(None, local.get(['report', guid, 'data'], cmd='get_blob'))
def test_GetDefaultBLOB(self):
self.start_server()
- client = Client('~')
+ local = IPCClient(mountpoint='~')
- guid = client.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
+ guid = local.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
- path, mime_type = client.Context(guid).get_blob_path('icon')
- assert path.endswith('missing.png')
- assert exists(path)
- self.assertEqual(False, client.Context(guid).get_blob('icon').closed)
+ blob = local.get(['context', guid, 'icon'], cmd='get_blob')
+ assert blob['path'].endswith('missing.png')
+ assert exists(blob['path'])
def test_Subscription(self):
self.start_server()
- client = Client('~')
-
- subscription = sockets.SocketFile(coroutine.socket(socket.AF_UNIX))
- subscription.connect('run/subscribe')
- coroutine.sleep()
-
- guid = client.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
-
- coroutine.select([subscription.fileno()], [], [])
- event = subscription.read_message()
- event.pop('props')
- self.assertEqual(
- {'document': 'context', 'event': 'create', 'guid': guid, 'seqno': 1},
- event)
- self.assertEqual(
- {'mountpoint': '~', 'document': 'context', 'event': 'commit', 'seqno': 1},
- subscription.read_message())
-
- client.Context(guid, title='new-title').post()
-
- coroutine.select([subscription.fileno()], [], [])
- event = subscription.read_message()
- event.pop('props')
- self.assertEqual(
- {'mountpoint': '~', 'document': 'context', 'event': 'update', 'guid': guid, 'seqno': 2},
- event)
- self.assertEqual(
- {'mountpoint': '~', 'document': 'context', 'event': 'commit', 'seqno': 2},
- subscription.read_message())
-
- client.Context.delete(guid)
-
- coroutine.select([subscription.fileno()], [], [])
- self.assertEqual(
- {'mountpoint': '~', 'document': 'context', 'event': 'delete', 'guid': guid},
- subscription.read_message())
- self.assertEqual(
- {'mountpoint': '~', 'document': 'context', 'event': 'commit', 'seqno': 2},
- subscription.read_message())
-
- def test_Subscription_NotifyOnline(self):
- self.start_ipc_and_restful_server()
-
- local = Client('~')
- remote = Client('/')
-
- guid = remote.Context(
- type='activity',
- title={'en': 'title'},
- summary={'en': 'summary'},
- description={'en': 'description'},
- keep=True).post()
-
- subscription = sockets.SocketFile(coroutine.socket(socket.AF_UNIX))
- subscription.connect('run/subscribe')
- coroutine.sleep(1)
-
- local.Context(guid, keep=False).post()
- coroutine.sleep(1)
-
- coroutine.select([subscription.fileno()], [], [])
- event = subscription.read_message()
- event.pop('props')
- self.assertEqual(
- {'document': 'context', 'event': 'update', 'guid': guid, 'seqno': 4},
- event)
-
- def test_Connect(self):
- self.start_server()
- client = Client('~')
-
- self.assertEqual(True, client.connected)
-
- def test_Localize(self):
- os.environ['LANG'] = 'en_US'
- self.start_server()
- client = Client('~')
-
- guid = client.Context(
- type='activity',
- title='title_en',
- summary='summary_en',
- description='description_en').post()
-
- res = client.Context(guid, ['title', 'summary', 'description'])
- self.assertEqual('title_en', res['title'])
- self.assertEqual('summary_en', res['summary'])
- self.assertEqual('description_en', res['description'])
-
- self.stop_servers()
- os.environ['LANG'] = 'ru_RU'
- self.start_server()
- client = Client('~')
-
- res = client.Context(guid, ['title', 'summary', 'description'])
- self.assertEqual('title_en', res['title'])
- self.assertEqual('summary_en', res['summary'])
- self.assertEqual('description_en', res['description'])
-
- res['title'] = 'title_ru'
- res['summary'] = 'summary_ru'
- res['description'] = 'description_ru'
- res.post()
-
- res = client.Context(guid, ['title', 'summary', 'description'])
- self.assertEqual('title_ru', res['title'])
- self.assertEqual('summary_ru', res['summary'])
- self.assertEqual('description_ru', res['description'])
-
- self.stop_servers()
- os.environ['LANG'] = 'es_ES'
- self.start_server()
- client = Client('~')
-
- res = client.Context(guid, ['title', 'summary', 'description'])
- self.assertEqual('title_en', res['title'])
- self.assertEqual('summary_en', res['summary'])
- self.assertEqual('description_en', res['description'])
-
- self.stop_servers()
- os.environ['LANG'] = 'ru_RU'
- self.start_server()
- client = Client('~')
-
- res = client.Context(guid, ['title', 'summary', 'description'])
- self.assertEqual('title_ru', res['title'])
- self.assertEqual('summary_ru', res['summary'])
- self.assertEqual('description_ru', res['description'])
+ local = IPCClient(mountpoint='~')
+ events = []
+
+ def read_events():
+ for event in local.subscribe():
+ if 'props' in event:
+ event.pop('props')
+ events.append(event)
+ job = coroutine.spawn(read_events)
+
+ guid = local.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
+ coroutine.dispatch()
+ local.put(['context', guid], {
+ 'title': 'title_2',
+ })
+ coroutine.dispatch()
+ local.delete(['context', guid])
+ coroutine.sleep(.5)
+ job.kill()
+
+ self.assertEqual([
+ {'guid': guid, 'seqno': 1, 'document': 'context', 'event': 'create'},
+ {'guid': guid, 'seqno': 2, 'document': 'context', 'event': 'update', 'mountpoint': '~'},
+ {'guid': guid, 'event': 'delete', 'document': 'context', 'mountpoint': '~'},
+ ],
+ events)
if __name__ == '__main__':
diff --git a/tests/units/injector.py b/tests/units/injector.py
index e178e87..a69a2cf 100755
--- a/tests/units/injector.py
+++ b/tests/units/injector.py
@@ -11,24 +11,26 @@ from os.path import exists
from __init__ import tests
from active_toolkit import coroutine
-from sugar_network import checkin, launch, Client
+from sugar_network import checkin, launch
from sugar_network.resources.user import User
from sugar_network.resources.context import Context
from sugar_network.resources.implementation import Implementation
from sugar_network.local import activities
+from sugar_network import IPCClient
class InjectorTest(tests.Test):
def test_checkin_Online(self):
self.start_ipc_and_restful_server([User, Context, Implementation])
- client = Client('/')
+ remote = IPCClient(mountpoint='/')
- context = client.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
+ context = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
blob_path = 'remote/context/%s/%s/feed' % (context[:2], context)
self.touch(
@@ -48,13 +50,14 @@ class InjectorTest(tests.Test):
],
[i for i in pipe])
- impl = client.Implementation(
- context=context,
- license=['GPLv3+'],
- version='1',
- date=0,
- stability='stable',
- notes='').post()
+ impl = remote.post(['implementation'], {
+ 'context': context,
+ 'license': 'GPLv3+',
+ 'version': '1',
+ 'date': 0,
+ 'stability': 'stable',
+ 'notes': '',
+ })
blob_path = 'remote/context/%s/%s/feed' % (context[:2], context)
self.touch(
@@ -108,20 +111,22 @@ class InjectorTest(tests.Test):
def test_launch_Online(self):
self.start_ipc_and_restful_server([User, Context, Implementation])
- client = Client('/')
+ remote = IPCClient(mountpoint='/')
- context = client.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
- impl = client.Implementation(
- context=context,
- license=['GPLv3+'],
- version='1',
- date=0,
- stability='stable',
- notes='').post()
+ context = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
+ impl = remote.post(['implementation'], {
+ 'context': context,
+ 'license': 'GPLv3+',
+ 'version': '1',
+ 'date': 0,
+ 'stability': 'stable',
+ 'notes': '',
+ })
blob_path = 'remote/context/%s/%s/feed' % (context[:2], context)
self.touch(
@@ -172,13 +177,14 @@ class InjectorTest(tests.Test):
],
[i for i in pipe])
- impl_2 = client.Implementation(
- context=context,
- license=['GPLv3+'],
- version='1',
- date=0,
- stability='stable',
- notes='').post()
+ impl_2 = remote.post(['implementation'], {
+ 'context': context,
+ 'license': 'GPLv3+',
+ 'version': '1',
+ 'date': 0,
+ 'stability': 'stable',
+ 'notes': '',
+ })
os.unlink('cache/context/%s/%s/feed.meta' % (context[:2], context))
blob_path = 'remote/context/%s/%s/feed' % (context[:2], context)
@@ -257,12 +263,13 @@ class InjectorTest(tests.Test):
]))
self.start_server()
- client = Client('~')
+ client = IPCClient(mountpoint='~')
monitor = coroutine.spawn(activities.monitor,
self.mounts.home_volume['context'], ['Activities'])
coroutine.sleep()
+ blob = client.get(['context', 'bundle_id', 'feed'], cmd='get_blob')
self.assertEqual(
json.dumps({
'1': {
@@ -288,7 +295,7 @@ class InjectorTest(tests.Test):
},
},
}),
- client.Context('bundle_id').get_blob('feed').read())
+ blob)
if __name__ == '__main__':
diff --git a/tests/units/ipc_client.py b/tests/units/ipc_client.py
deleted file mode 100755
index b86ac18..0000000
--- a/tests/units/ipc_client.py
+++ /dev/null
@@ -1,347 +0,0 @@
-#!/usr/bin/env python
-# sugar-lint: disable
-
-import os
-from os.path import isdir
-
-from __init__ import tests
-
-import active_document as ad
-
-from active_toolkit import coroutine
-from sugar_network.resources.user import User
-from sugar_network.resources.context import Context
-from sugar_network.local.mounts import HomeMount
-from sugar_network.local.mountset import Mountset
-from sugar_network import Client
-from sugar_network.resources.volume import Volume
-
-
-class IPCClientTest(tests.Test):
-
- def test_RealtimeUpdates(self):
- self.start_server()
-
- client_1 = Client('~')
- client_2 = Client('~')
-
- events_1 = []
- events_2 = []
-
- def waiter(cursor, events):
- for i in cursor.read_events():
- events.append(i)
-
- cursor_1 = client_1.Context.cursor(reply=['guid', 'title'])
- self.assertEqual(0, cursor_1.total)
- coroutine.spawn(waiter, cursor_1, events_1)
-
- cursor_2 = client_2.Context.cursor(reply=['guid', 'title'])
- self.assertEqual(0, cursor_2.total)
- coroutine.spawn(waiter, cursor_2, events_2)
-
- guid_1 = client_1.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
- coroutine.sleep(.1)
- self.assertEqual([None], events_1)
- self.assertEqual([None], events_2)
- self.assertEqual(
- sorted([
- (guid_1, 'title'),
- ]),
- sorted([(i['guid'], i['title']) for i in cursor_1]))
- self.assertEqual(
- sorted([
- (guid_1, 'title'),
- ]),
- sorted([(i['guid'], i['title']) for i in cursor_2]))
-
- client_1.Context(guid_1, title='title-2').post()
- coroutine.sleep(.1)
- self.assertEqual([None], events_1[1:])
- self.assertEqual([None], events_2[1:])
- self.assertEqual(
- sorted([
- (guid_1, 'title-2'),
- ]),
- sorted([(i['guid'], i['title']) for i in cursor_1]))
- self.assertEqual(
- sorted([
- (guid_1, 'title-2'),
- ]),
- sorted([(i['guid'], i['title']) for i in cursor_2]))
-
- guid_2 = client_2.Context(
- type='activity',
- title='title-3',
- summary='summary',
- description='description').post()
- coroutine.sleep(.1)
- self.assertEqual([None], events_1[2:])
- self.assertEqual([None], events_2[2:])
- self.assertEqual(
- sorted([
- (guid_1, 'title-2'),
- (guid_2, 'title-3'),
- ]),
- sorted([(i['guid'], i['title']) for i in cursor_1]))
- self.assertEqual(
- sorted([
- (guid_1, 'title-2'),
- (guid_2, 'title-3'),
- ]),
- sorted([(i['guid'], i['title']) for i in cursor_2]))
-
- client_2.Context(guid_2, title='title-4').post()
- coroutine.sleep(.1)
- self.assertEqual([None], events_1[3:])
- self.assertEqual([None], events_2[3:])
- self.assertEqual(
- sorted([
- (guid_1, 'title-2'),
- (guid_2, 'title-4'),
- ]),
- sorted([(i['guid'], i['title']) for i in cursor_1]))
- self.assertEqual(
- sorted([
- (guid_1, 'title-2'),
- (guid_2, 'title-4'),
- ]),
- sorted([(i['guid'], i['title']) for i in cursor_2]))
-
- client_1.Context.delete(guid_1)
- coroutine.sleep(.1)
- self.assertEqual([None], events_1[4:])
- self.assertEqual([None], events_2[4:])
- self.assertEqual(
- sorted([
- (guid_2, 'title-4'),
- ]),
- sorted([(i['guid'], i['title']) for i in cursor_1]))
- self.assertEqual(
- sorted([
- (guid_2, 'title-4'),
- ]),
- sorted([(i['guid'], i['title']) for i in cursor_2]))
-
- client_2.Context.delete(guid_2)
- coroutine.sleep(.1)
- self.assertEqual([None], events_1[5:])
- self.assertEqual([None], events_2[5:])
- self.assertEqual(
- sorted([
- ]),
- sorted([(i['guid'], i['title']) for i in cursor_1]))
- self.assertEqual(
- sorted([
- ]),
- sorted([(i['guid'], i['title']) for i in cursor_2]))
-
- def test_ReplaceReadEventsCalls(self):
- self.start_server()
-
- client = Client('~')
- cursor = client.Context.cursor(reply=['guid', 'title'])
-
- def waiter(cursor, events):
- for i in cursor.read_events():
- events.append(i)
-
- events_1 = []
- coroutine.spawn(waiter, cursor, events_1)
- coroutine.sleep()
-
- events_2 = []
- coroutine.spawn(waiter, cursor, events_2)
- coroutine.sleep()
-
- guid = client.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
- coroutine.sleep()
-
- self.assertEqual([], events_1)
- self.assertEqual([None], events_2)
-
- events_3 = []
- coroutine.spawn(waiter, cursor, events_3)
- coroutine.sleep()
-
- client.Context(guid, title='title-1').post()
- coroutine.sleep()
-
- self.assertEqual([], events_1)
- self.assertEqual([None], events_2)
- self.assertEqual([None], events_3)
-
- def test_Cursor_Gets(self):
- self.start_server()
-
- client = Client('~')
-
- guid_1 = client.Context(
- type='activity',
- title='title-1',
- summary='summary',
- description='description').post()
- guid_2 = client.Context(
- type='activity',
- title='title-2',
- summary='summary',
- description='description').post()
- guid_3 = client.Context(
- type='activity',
- title='title-3',
- summary='summary',
- description='description').post()
-
- cursor = client.Context.cursor(reply=['guid', 'title'])
- self.assertEqual('title-1', cursor[0]['title'])
- self.assertEqual('title-2', cursor[1]['title'])
- self.assertEqual('title-3', cursor[2]['title'])
- self.assertEqual('title-1', cursor[guid_1]['title'])
- self.assertEqual('title-2', cursor[guid_2]['title'])
- self.assertEqual('title-3', cursor[guid_3]['title'])
-
- cursor = client.Context.cursor('FOO', reply=['guid', 'title'])
- self.assertEqual(0, cursor.total)
- self.assertEqual('title-1', cursor[guid_1]['title'])
- self.assertEqual('title-2', cursor[guid_2]['title'])
- self.assertEqual('title-3', cursor[guid_3]['title'])
-
- def test_ConnectEventsInCursor(self):
-
- def remote_server():
- coroutine.sleep(1)
- self.restful_server()
-
- pid = self.fork(self.restful_server)
-
- self.start_server()
- client = Client('/')
-
- events = []
- cursor = client.Context.cursor(reply=['guid', 'title'])
-
- def waiter():
- for i in cursor.read_events():
- events.append(i)
-
- coroutine.spawn(waiter)
- coroutine.sleep(.1)
-
- self.assertEqual([], events)
-
- coroutine.sleep(1)
-
- self.assertEqual([None], events)
-
- self.waitpid(pid)
- coroutine.sleep(1)
-
- self.assertEqual([None, None], events)
-
- def test_PublishEvents(self):
- self.start_server()
-
- events = []
- Client.connect(lambda event: events.append(event))
-
- Client.publish('probe', payload=1)
- Client.publish('probe', payload=2)
- Client.publish('probe', payload=3)
- coroutine.sleep()
-
- self.assertEqual([
- {'payload': 1, 'event': 'probe'},
- {'payload': 2, 'event': 'probe'},
- {'payload': 3, 'event': 'probe'},
- ],
- events)
-
- def test_GetBLOBs(self):
-
- class Mounts(object):
-
- def call(self_, request, response):
- if not (request['cmd'] == 'get_blob' and \
- request['document'] == 'document' and \
- 'guid' in request):
- return
- if request['prop'] == 'directory':
- os.makedirs('directory')
- return {'path': tests.tmpdir + '/directory', 'mime_type': 'fake'}
- elif request['prop'] == 'file':
- self.touch(('file', 'file'))
- return {'path': tests.tmpdir + '/file', 'mime_type': 'fake'}
- elif request['prop'] == 'value':
- return 'value'
-
- def close(self):
- pass
-
- self.start_server([])
- self.server._mounts = Mounts()
- client = Client('~')
-
- blob = client.Document('guid').get_blob_path('directory')
- self.assertEqual(tests.tmpdir + '/directory', blob[0])
- self.assertEqual('fake', blob[1])
- assert isdir('directory')
-
- blob = client.Document('guid').get_blob('file')
- self.assertEqual(tests.tmpdir + '/file', blob.name)
- self.assertEqual('fake', blob.mime_type)
- self.assertEqual('file', blob.read())
-
- blob = client.Document('guid').get_blob('value')
- self.assertEqual('value', blob.read())
-
- def test_Direct(self):
- volume = Volume('local', [User, Context])
- Client._connection = Mountset(volume)
- Client._connection['~'] = HomeMount(volume)
- Client._connection.open()
- client = Client('~')
-
- guid_1 = client.Context(
- type='activity',
- title='title-1',
- summary='summary',
- description='description').post()
- guid_2 = client.Context(
- type='activity',
- title='title-2',
- summary='summary',
- description='description').post()
-
- self.assertEqual(
- 'title-1',
- client.Context(guid_1, reply=['title'])['title'])
-
- self.assertEqual(
- sorted([
- (guid_1, 'title-1'),
- (guid_2, 'title-2'),
- ]),
- sorted([(i.guid, i['title']) \
- for i in client.Context.cursor(reply=['title'])]))
-
- self.touch(('file', 'blob'))
- client.Context(guid_2).upload_blob('preview', 'file')
- self.assertEqual(
- 'blob',
- client.Context(guid_2).get_blob('preview').read())
-
- client.Context.delete(guid_1)
- client.Context.delete(guid_2)
- self.assertEqual(0, client.Context.cursor().total)
-
-
-if __name__ == '__main__':
- tests.main()
diff --git a/tests/units/mountset.py b/tests/units/mountset.py
index 9188509..c26435c 100755
--- a/tests/units/mountset.py
+++ b/tests/units/mountset.py
@@ -11,21 +11,27 @@ from __init__ import tests
import active_document as ad
from active_toolkit import coroutine, sockets
from sugar_network.local.mountset import Mountset
-from sugar_network.local.bus import IPCServer
from sugar_network.resources.user import User
from sugar_network.resources.context import Context
from sugar_network.toolkit import http, mounts_monitor
-from sugar_network import local, Client, ServerError, sugar, node
+from sugar_network import local, sugar, node
from sugar_network.resources.volume import Volume
from sugar_network.local.mounts import HomeMount, RemoteMount
+from sugar_network.local.ipc_client import Router as IPCRouter
+from sugar_network import IPCClient, Client
class MountsetTest(tests.Test):
def setUp(self):
tests.Test.setUp(self)
+ self.events_job = None
+ self.events = []
+ self.mounted = coroutine.Event()
def tearDown(self):
+ if self.events_job is not None:
+ self.events_job.kill()
tests.Test.tearDown(self)
def mountset(self):
@@ -33,21 +39,23 @@ class MountsetTest(tests.Test):
volume = Volume('local', [User, Context])
mounts = Mountset(volume)
- Client._connection = mounts
- self.mounted = coroutine.Event()
+ self.server = coroutine.WSGIServer(
+ ('localhost', local.ipc_port.value), IPCRouter(mounts))
+ coroutine.spawn(self.server.serve_forever)
+ mounts.open()
+ mounts.opened.wait()
- def events_cb(event):
- if event['event'] in ('mount', 'unmount'):
- self.mount_events.append((event['event'], event['mountpoint']))
+ def read_events():
+ for event in IPCClient().subscribe():
+ if 'props' in event:
+ event.pop('props')
+ self.events.append(event)
self.mounted.set()
- self.mount_events = []
- Client.connect(events_cb)
-
- mounts.open()
- mounts.opened.wait()
+ coroutine.dispatch()
+ self.events_job = coroutine.spawn(read_events)
+ coroutine.sleep(.5)
mounts_monitor.start(tests.tmpdir)
- # Let `open()` start processing spawned jobs
coroutine.dispatch()
return mounts
@@ -56,183 +64,118 @@ class MountsetTest(tests.Test):
os.makedirs('1/.sugar-network')
os.makedirs('2/.sugar-network')
- self.mountset()
+ mounts = self.mountset()
+ mounts[tests.tmpdir + '/1'].mounted.wait()
+ mounts[tests.tmpdir + '/2'].mounted.wait()
- self.mounted.wait()
- self.mounted.clear()
self.assertEqual([
- ('mount', tests.tmpdir + '/1'),
- ('mount', tests.tmpdir + '/2'),
+ {'mountpoint': tests.tmpdir + '/1', 'event': 'mount', 'private': True, 'name': '1'},
+ {'mountpoint': tests.tmpdir + '/2', 'event': 'mount', 'private': True, 'name': '2'},
],
- self.mount_events)
-
- Client(tests.tmpdir + '/1').Context(
- type='activity',
- title='remote',
- summary='summary',
- description='description').post()
- Client(tests.tmpdir + '/2').Context(
- type='activity',
- title='remote',
- summary='summary',
- description='description').post()
+ self.events)
self.assertEqual(
sorted([
{'mountpoint': tests.tmpdir + '/1', 'name': '1', 'private': True},
{'mountpoint': tests.tmpdir + '/2', 'name': '2', 'private': True},
]),
- sorted(Client.mounts()))
+ sorted(IPCClient().get(cmd='mounts')))
def test_Mount(self):
- self.mountset()
+ mounts = self.mountset()
os.makedirs('tmp/1/.sugar-network')
shutil.move('tmp/1', '.')
-
self.mounted.wait()
self.mounted.clear()
- self.assertEqual(
- [('mount', tests.tmpdir + '/1')],
- self.mount_events)
+
+ self.assertEqual([
+ {'mountpoint': tests.tmpdir + '/1', 'event': 'mount', 'private': True, 'name': '1'},
+ ],
+ self.events)
self.assertEqual(
sorted([
{'mountpoint': tests.tmpdir + '/1', 'name': '1', 'private': True},
]),
- sorted(Client.mounts()))
- Client(tests.tmpdir + '/1').Context(
- type='activity',
- title='remote',
- summary='summary',
- description='description').post()
+ sorted(IPCClient().get(cmd='mounts')))
os.makedirs('tmp/2/.sugar-network')
shutil.move('tmp/2', '.')
-
self.mounted.wait()
self.mounted.clear()
- self.assertEqual(
- [('mount', tests.tmpdir + '/2')],
- self.mount_events[1:])
+
+ self.assertEqual([
+ {'mountpoint': tests.tmpdir + '/1', 'event': 'mount', 'private': True, 'name': '1'},
+ {'mountpoint': tests.tmpdir + '/2', 'event': 'mount', 'private': True, 'name': '2'},
+ ],
+ self.events)
self.assertEqual(
sorted([
{'mountpoint': tests.tmpdir + '/1', 'name': '1', 'private': True},
{'mountpoint': tests.tmpdir + '/2', 'name': '2', 'private': True},
]),
- sorted(Client.mounts()))
- Client(tests.tmpdir + '/2').Context(
- type='activity',
- title='remote',
- summary='summary',
- description='description').post()
+ sorted(IPCClient().get(cmd='mounts')))
def test_Unmount(self):
os.makedirs('1/.sugar-network')
os.makedirs('2/.sugar-network')
- self.mountset()
- self.mounted.wait()
- self.mounted.clear()
+ mounts = self.mountset()
+ client = IPCClient()
+
self.assertEqual(
sorted([
{'mountpoint': tests.tmpdir + '/1', 'name': '1', 'private': True},
{'mountpoint': tests.tmpdir + '/2', 'name': '2', 'private': True},
]),
- sorted(Client.mounts()))
+ sorted(client.get(cmd='mounts')))
- del self.mount_events[:]
+ self.mounted.clear()
+ del self.events[:]
shutil.rmtree('1')
self.mounted.wait()
self.mounted.clear()
- self.assertEqual(
- [('unmount', tests.tmpdir + '/1')],
- self.mount_events)
+
+ self.assertEqual([
+ {'mountpoint': tests.tmpdir + '/1', 'event': 'unmount', 'private': True, 'name': '1'},
+ ],
+ self.events)
self.assertEqual(
sorted([
{'mountpoint': tests.tmpdir + '/2', 'name': '2', 'private': True},
]),
- sorted(Client.mounts()))
+ sorted(client.get(cmd='mounts')))
def test_MountNode(self):
local.server_mode.value = True
- self.mountset()
+ mounts = self.mountset()
self.touch('tmp/mnt/.sugar-network')
self.touch(('tmp/mnt/node', 'node'))
shutil.move('tmp/mnt', '.')
-
self.mounted.wait()
self.mounted.clear()
- client = Client(tests.tmpdir + '/mnt')
+ self.assertEqual([
+ {'mountpoint': tests.tmpdir + '/mnt', 'event': 'mount', 'private': False, 'name': 'mnt'},
+ ],
+ self.events)
self.assertEqual(
sorted([
{'mountpoint': tests.tmpdir + '/mnt', 'name': 'mnt', 'private': False},
]),
- sorted(Client.mounts()))
-
- guid = client.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
-
- local.api_url.value = 'http://localhost:%s' % node.port.value
- self.assertEqual(
- {'guid': guid, 'title': {'en-US': 'title'}},
- http.request('GET', ['context', guid], params={'reply': 'guid,title'}))
-
- def test_RootRemount(self):
- mountset = self.mountset()
- client = Client('/')
-
- mountset['/'] = RemoteMount(mountset.home_volume)
- self.assertEqual(['/'], [i['mountpoint'] for i in mountset.mounts()])
- assert not mountset['/'].mounted.is_set()
- self.assertEqual(0, client.Context.cursor().total)
- self.assertRaises(RuntimeError, client.Context(type='activity', title='', summary='', description='').post)
-
- pid = self.fork(self.restful_server)
- coroutine.sleep(1)
-
- self.assertEqual([], self.mount_events)
- self.assertRaises(RuntimeError, client.Context(type='activity', title='', summary='', description='').post)
- self.mounted.wait()
- self.mounted.clear()
- assert mountset['/'].mounted.is_set()
- self.assertEqual([('mount', '/')], self.mount_events)
- del self.mount_events[:]
-
- guid = client.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description',
- ).post()
+ sorted(IPCClient().get(cmd='mounts')))
+
+ client = Client('http://localhost:%s' % node.port.value)
+ guid = client.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
self.assertEqual(
- [guid],
- [i['guid'] for i in client.Context.cursor()])
-
- self.waitpid(pid)
-
- self.mounted.wait()
- self.mounted.clear()
- assert not mountset['/'].mounted.is_set()
- self.assertEqual([('unmount', '/')], self.mount_events)
- del self.mount_events[:]
- self.assertEqual(0, client.Context.cursor().total)
- self.assertRaises(RuntimeError, client.Context(type='activity', title='', summary='', description='').post)
-
- pid = self.fork(self.restful_server)
- coroutine.sleep(1)
-
- self.assertEqual([], self.mount_events)
- self.assertRaises(RuntimeError, client.Context(type='activity', title='', summary='', description='').post)
- self.mounted.wait()
- self.mounted.clear()
- assert mountset['/'].mounted.is_set()
- self.assertEqual([('mount', '/')], self.mount_events)
- del self.mount_events[:]
+ 'title',
+ client.get(['context', guid, 'title']))
if __name__ == '__main__':
diff --git a/tests/units/node.py b/tests/units/node.py
index 4ddae32..66e92e5 100755
--- a/tests/units/node.py
+++ b/tests/units/node.py
@@ -8,7 +8,8 @@ from __init__ import tests
import active_document as ad
from sugar_network import node
-from sugar_network.node import stats, Unauthorized
+from sugar_network.toolkit.router import Unauthorized
+from sugar_network.node import stats
from sugar_network.node.commands import NodeCommands
from sugar_network.resources.volume import Volume
from sugar_network.resources.user import User
@@ -102,9 +103,10 @@ class NodeTest(tests.Test):
assert exists(guid_path)
self.assertEqual({
'guid': guid,
- 'title': {'en': 'title'},
+ 'title': 'title',
+ 'layer': ['public'],
},
- call(cp, method='GET', document='context', guid=guid, reply=['guid', 'title']))
+ call(cp, method='GET', document='context', guid=guid, reply=['guid', 'title', 'layer']))
self.assertEqual(['public'], volume['context'].get(guid)['layer'])
call(cp, method='DELETE', document='context', guid=guid, principal='principal')
@@ -296,30 +298,30 @@ class NodeTest(tests.Test):
volume['context'].set_blob(guid5, 'icon', url={'file1': {'order': 1, 'url': '/1'}, 'file2': {'order': 2, 'url': 'http://2'}})
self.assertEqual(
- {'guid': guid1, 'icon': 'http://localhost:8000/static/images/missing.png'},
- call(cp, method='GET', document='context', guid=guid1, reply=['guid', 'icon']))
+ {'guid': guid1, 'icon': 'http://localhost:8000/static/images/missing.png', 'layer': ['public']},
+ call(cp, method='GET', document='context', guid=guid1, reply=['guid', 'icon', 'layer']))
self.assertEqual(
- {'guid': guid2, 'icon': 'http://foo/bar'},
- call(cp, method='GET', document='context', guid=guid2, reply=['guid', 'icon']))
+ {'guid': guid2, 'icon': 'http://foo/bar', 'layer': ['public']},
+ call(cp, method='GET', document='context', guid=guid2, reply=['guid', 'icon', 'layer']))
self.assertEqual(
- {'guid': guid3, 'icon': 'http://localhost:8000/foo/bar'},
- call(cp, method='GET', document='context', guid=guid3, reply=['guid', 'icon']))
+ {'guid': guid3, 'icon': 'http://localhost:8000/foo/bar', 'layer': ['public']},
+ call(cp, method='GET', document='context', guid=guid3, reply=['guid', 'icon', 'layer']))
self.assertEqual(
- {'guid': guid4, 'data': 'http://localhost:8000/report/%s/data' % guid4},
- call(cp, method='GET', document='report', guid=guid4, reply=['guid', 'data']))
+ {'guid': guid4, 'data': 'http://localhost:8000/report/%s/data' % guid4, 'layer': ['public']},
+ call(cp, method='GET', document='report', guid=guid4, reply=['guid', 'data', 'layer']))
self.assertEqual([
- {'guid': guid1, 'icon': 'http://localhost:8000/static/images/missing.png'},
- {'guid': guid2, 'icon': 'http://foo/bar'},
- {'guid': guid3, 'icon': 'http://localhost:8000/foo/bar'},
- {'guid': guid5, 'icon': ['http://localhost:8000/1', 'http://2']},
+ {'guid': guid1, 'icon': 'http://localhost:8000/static/images/missing.png', 'layer': ['public']},
+ {'guid': guid2, 'icon': 'http://foo/bar', 'layer': ['public']},
+ {'guid': guid3, 'icon': 'http://localhost:8000/foo/bar', 'layer': ['public']},
+ {'guid': guid5, 'icon': ['http://localhost:8000/1', 'http://2'], 'layer': ['public']},
],
- call(cp, method='GET', document='context', reply=['guid', 'icon'])['result'])
+ call(cp, method='GET', document='context', reply=['guid', 'icon', 'layer'])['result'])
self.assertEqual([
- {'guid': guid4, 'data': 'http://localhost:8000/report/%s/data' % guid4},
+ {'guid': guid4, 'data': 'http://localhost:8000/report/%s/data' % guid4, 'layer': ['public']},
],
- call(cp, method='GET', document='report', reply=['guid', 'data'])['result'])
+ call(cp, method='GET', document='report', reply=['guid', 'data', 'layer'])['result'])
def test_DeletedDocuments(self):
volume = Volume('db')
diff --git a/tests/units/node_mount.py b/tests/units/node_mount.py
index eed94f7..6e864a3 100755
--- a/tests/units/node_mount.py
+++ b/tests/units/node_mount.py
@@ -13,164 +13,149 @@ import active_document as ad
from active_toolkit import coroutine, sockets
from sugar_network.local.mounts import HomeMount
from sugar_network.local.mountset import Mountset
-from sugar_network.local.bus import IPCServer
from sugar_network.local import activities
from sugar_network.toolkit import mounts_monitor
from sugar_network.resources.user import User
from sugar_network.resources.context import Context
-from sugar_network import local, Client, sugar
+from sugar_network import local, sugar
from sugar_network.resources.volume import Volume
from sugar_network.resources.report import Report
+from sugar_network.local.ipc_client import Router as IPCRouter
+from sugar_network import IPCClient
class NodeMountTest(tests.Test):
def setUp(self):
tests.Test.setUp(self)
+ self.events_job = None
local.server_mode.value = True
def tearDown(self):
+ if self.events_job is not None:
+ self.events_job.kill()
tests.Test.tearDown(self)
- def start_server(self, ipc=False):
+ def start_server(self):
+ self.touch('mnt/.sugar-network')
+ self.touch(('mnt/node', 'node'))
local.mounts_root.value = tests.tmpdir
volume = Volume('local', [User, Context, Report])
- mounts = Mountset(volume)
- if ipc:
- self.server = IPCServer(mounts)
- coroutine.spawn(self.server.serve_forever)
- coroutine.dispatch()
- else:
- Client._connection = mounts
- self.got_event = coroutine.Event()
-
- def events_cb(event):
- if event['event'] in ('mount', 'unmount') and \
- event['mountpoint'].startswith(local.mounts_root.value):
- self.events.append((event['event'], event['mountpoint']))
- self.got_event.set()
-
- self.events = []
- Client.connect(events_cb)
-
- mounts.open()
+ self.mounts = Mountset(volume)
+ self.server = coroutine.WSGIServer(
+ ('localhost', local.ipc_port.value), IPCRouter(self.mounts))
+ coroutine.spawn(self.server.serve_forever)
+ self.mounts.open()
mounts_monitor.start(tests.tmpdir)
- mounts.opened.wait()
- # Let `open()` start processing spawned jobs
- coroutine.dispatch()
+ self.mounts.opened.wait()
- return mounts
+ return self.mounts
def test_GetKeep(self):
- self.touch('mnt/.sugar-network')
- self.touch(('mnt/node', 'node'))
mounts = self.start_server()
- self.got_event.wait()
-
- remote = Client(tests.tmpdir + '/mnt')
+ mounts[tests.tmpdir + '/mnt'].mounted.wait()
+ remote = IPCClient(mountpoint=tests.tmpdir + '/mnt')
- guid = remote.Context(
- type='activity',
- title='remote',
- summary='summary',
- description='description').post()
+ guid = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
- context = remote.Context(guid, ['keep', 'keep_impl'])
+ context = remote.get(['context', guid], reply=['keep', 'keep_impl'])
self.assertEqual(False, context['keep'])
self.assertEqual(0, context['keep_impl'])
- self.assertEqual(
- [(guid, False, False)],
- [(i['guid'], i['keep'], i['keep_impl']) for i in remote.Context.cursor(reply=['keep', 'keep_impl'])])
+ self.assertEqual([
+ {'guid': guid, 'keep': False, 'keep_impl': False},
+ ],
+ remote.get(['context'], reply=['guid', 'keep', 'keep_impl'])['result'])
mounts.home_volume['context'].create(guid=guid, type='activity',
- title={'en': 'local'}, summary={'en': 'summary'},
- description={'en': 'description'}, keep=True, keep_impl=2,
+ title='local', summary='summary',
+ description='description', keep=True, keep_impl=2,
user=[sugar.uid()])
+ coroutine.sleep(1)
- context = remote.Context(guid, ['keep', 'keep_impl'])
+ context = remote.get(['context', guid], reply=['keep', 'keep_impl'])
self.assertEqual(True, context['keep'])
self.assertEqual(2, context['keep_impl'])
- self.assertEqual(
- [(guid, True, 2)],
- [(i['guid'], i['keep'], i['keep_impl']) for i in remote.Context.cursor(reply=['keep', 'keep_impl'])])
+ self.assertEqual([
+ {'guid': guid, 'keep': True, 'keep_impl': 2},
+ ],
+ remote.get(['context'], reply=['guid', 'keep', 'keep_impl'])['result'])
def test_SetKeep(self):
- self.touch('mnt/.sugar-network')
- self.touch(('mnt/node', 'node'))
mounts = self.start_server()
+ mounts[tests.tmpdir + '/mnt'].mounted.wait()
mounts['~'] = HomeMount(mounts.home_volume)
- self.got_event.wait()
- remote = Client(tests.tmpdir + '/mnt')
- local = Client('~')
-
- guid_1 = remote.Context(
- type=['activity'],
- title='remote',
- summary='summary',
- description='description').post()
- guid_2 = remote.Context(
- type=['activity'],
- title='remote-2',
- summary='summary',
- description='description').post()
-
- self.assertRaises(ad.NotFound, lambda: local.Context(guid_1, reply=['title'])['title'])
- self.assertRaises(ad.NotFound, lambda: local.Context(guid_2, reply=['title'])['title'])
-
- remote.Context(guid_1, keep=True).post()
-
- cursor = local.Context.cursor(reply=['keep', 'keep_impl', 'title'])
+ local = IPCClient(mountpoint='~')
+ remote = IPCClient(mountpoint=tests.tmpdir + '/mnt')
+
+ guid_1 = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'remote',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
+ guid_2 = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'remote-2',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
+
+ self.assertRaises(RuntimeError, local.get, ['context', guid_1])
+ self.assertRaises(RuntimeError, local.get, ['context', guid_2])
+
+ remote.put(['context', guid_1], {'keep': True})
+
self.assertEqual(
sorted([
- (guid_1, 'remote', True, 0),
+ {'guid': guid_1, 'title': 'remote', 'keep': True, 'keep_impl': 0},
]),
- sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor]))
- cursor = remote.Context.cursor(reply=['keep', 'keep_impl', 'title'])
+ sorted(local.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result']))
self.assertEqual(
sorted([
- (guid_1, 'remote', True, 0),
- (guid_2, 'remote-2', False, 0),
+ {'guid': guid_1, 'title': 'remote', 'keep': True, 'keep_impl': 0},
+ {'guid': guid_2, 'title': 'remote-2', 'keep': False, 'keep_impl': 0},
]),
- sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor]))
+ sorted(remote.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result']))
- remote.Context(guid_1, keep=False).post()
+ remote.put(['context', guid_1], {'keep': False})
- cursor = local.Context.cursor(reply=['keep', 'keep_impl', 'title'])
self.assertEqual(
sorted([
- (guid_1, 'remote', False, 0),
+ {'guid': guid_1, 'title': 'remote', 'keep': False, 'keep_impl': 0},
]),
- sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor]))
- cursor = remote.Context.cursor(reply=['keep', 'keep_impl', 'title'])
+ sorted(local.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result']))
self.assertEqual(
sorted([
- (guid_1, 'remote', False, 0),
- (guid_2, 'remote-2', False, 0),
+ {'guid': guid_1, 'title': 'remote', 'keep': False, 'keep_impl': 0},
+ {'guid': guid_2, 'title': 'remote-2', 'keep': False, 'keep_impl': 0},
]),
- sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor]))
+ sorted(remote.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result']))
- context = local.Context(guid_1)
- context['title'] = 'local'
- context.post()
- context = local.Context(guid_1, reply=['keep', 'keep_impl', 'title'])
- self.assertEqual('local', context['title'])
+ local.put(['context', guid_1], {'title': 'local'})
- remote.Context(guid_1, keep=True).post()
+ self.assertEqual(
+ {'title': 'local'},
+ local.get(['context', guid_1], reply=['title']))
+
+ remote.put(['context', guid_1], {'keep': True})
- cursor = local.Context.cursor(reply=['keep', 'keep_impl', 'title'])
self.assertEqual(
sorted([
- (guid_1, 'local', True, 0),
+ {'guid': guid_1, 'title': 'local', 'keep': True, 'keep_impl': 0},
]),
- sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor]))
- cursor = remote.Context.cursor(reply=['keep', 'keep_impl', 'title'])
+ sorted(local.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result']))
self.assertEqual(
sorted([
- (guid_1, 'remote', True, 0),
- (guid_2, 'remote-2', False, 0),
+ {'guid': guid_1, 'title': 'remote', 'keep': True, 'keep_impl': 0},
+ {'guid': guid_2, 'title': 'remote-2', 'keep': False, 'keep_impl': 0},
]),
- sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor]))
+ sorted(remote.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result']))
def test_SetKeepImpl(self):
Volume.RESOURCES = [
@@ -179,27 +164,28 @@ class NodeMountTest(tests.Test):
'sugar_network.resources.implementation',
]
- self.touch('mnt/.sugar-network')
- self.touch(('mnt/node', 'node'))
- mounts = self.start_server(ipc=True)
+ mounts = self.start_server()
+ mounts[tests.tmpdir + '/mnt'].mounted.wait()
mounts['~'] = HomeMount(mounts.home_volume)
- self.got_event.wait()
- remote = Client(tests.tmpdir + '/mnt')
- local = Client('~')
+ local = IPCClient(mountpoint='~')
+ remote = IPCClient(mountpoint=tests.tmpdir + '/mnt')
coroutine.spawn(activities.monitor, mounts.home_volume['context'], ['Activities'])
- context = remote.Context(
- type=['activity'],
- title='remote',
- summary='summary',
- description='description').post()
- impl = remote.Implementation(
- context=context,
- license=['GPLv3+'],
- version='1',
- date=0,
- stability='stable',
- notes='').post()
+ context = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'remote',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
+ impl = remote.post(['implementation'], {
+ 'context': context,
+ 'license': 'GPLv3+',
+ 'version': '1',
+ 'date': 0,
+ 'stability': 'stable',
+ 'notes': '',
+ })
+
with file('mnt/context/%s/%s/feed' % (context[:2], context), 'w') as f:
json.dump({
'seqno': 0,
@@ -233,125 +219,120 @@ class NodeMountTest(tests.Test):
'license=Public Domain',
]))
bundle.close()
- remote.Implementation(impl).upload_blob('data', 'bundle')
+ remote.put(['implementation', impl, 'data'], cmd='upload_blob', path=abspath('bundle'))
- remote.Context(context, keep_impl=1).post()
+ remote.put(['context', context], {'keep_impl': 1})
+ coroutine.sleep(1)
- cursor = local.Context.cursor(reply=['keep_impl', 'title'])
+ cursor = local.get(['context'], reply=['guid', 'keep_impl', 'title'])['result']
self.assertEqual([
- (context, 'remote', 2),
+ {'guid': context, 'title': 'remote', 'keep_impl': 2},
],
- [(i.guid, i['title'], i['keep_impl']) for i in cursor])
+ cursor)
assert exists('Activities/TestActivitry/activity/activity.info')
def test_Events(self):
- self.touch('mnt/.sugar-network')
- self.touch(('mnt/node', 'node'))
- self.start_server()
- self.got_event.wait()
- self.got_event.clear()
- client = Client(tests.tmpdir + '/mnt')
-
- def events_cb(event):
- if 'props' in event:
- event.pop('props')
- events.append(event)
- got_commit.set()
- got_commit.clear()
+ mounts = self.start_server()
+ mounts[tests.tmpdir + '/mnt'].mounted.wait()
+ remote = IPCClient(mountpoint=tests.tmpdir + '/mnt')
events = []
- got_commit = coroutine.Event()
- Client.connect(events_cb)
-
- guid = client.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
- got_commit.wait()
+ got_event = coroutine.Event()
+
+ def read_events():
+ for event in remote.subscribe():
+ if 'props' in event:
+ event.pop('props')
+ events.append(event)
+ got_event.set()
+ job = coroutine.spawn(read_events)
+
+ guid = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
+ got_event.wait()
+ got_event.clear()
self.assertEqual([
{'mountpoint': tests.tmpdir + '/mnt', 'document': 'context', 'event': 'create', 'guid': guid, 'seqno': 1},
- {'mountpoint': tests.tmpdir + '/mnt', 'document': 'context', 'event': 'commit', 'seqno': 1},
],
events)
del events[:]
- client.Context(guid, title='new-title').post()
- got_commit.wait()
+ remote.put(['context', guid], {'title': 'new-title'})
+ got_event.wait()
+ got_event.clear()
self.assertEqual([
{'mountpoint': tests.tmpdir + '/mnt', 'document': 'context', 'event': 'update', 'guid': guid, 'seqno': 2},
- {'mountpoint': tests.tmpdir + '/mnt', 'document': 'context', 'event': 'commit', 'seqno': 2},
],
events)
del events[:]
guid_path = 'mnt/context/%s/%s' % (guid[:2], guid)
assert exists(guid_path)
- client.Context.delete(guid)
+ remote.delete(['context', guid])
assert not exists(guid_path)
- got_commit.wait()
+ got_event.wait()
+ got_event.clear()
self.assertEqual([
{'mountpoint': tests.tmpdir + '/mnt', 'document': 'context', 'event': 'delete', 'guid': guid},
- {'mountpoint': tests.tmpdir + '/mnt', 'document': 'context', 'event': 'commit', 'seqno': 2},
],
events)
del events[:]
def test_upload_blob(self):
- self.touch('mnt/.sugar-network')
- self.touch(('mnt/node', 'node'))
- self.start_server()
- self.got_event.wait()
- remote = Client(tests.tmpdir + '/mnt')
+ mounts = self.start_server()
+ mounts[tests.tmpdir + '/mnt'].mounted.wait()
+ remote = IPCClient(mountpoint=tests.tmpdir + '/mnt')
- guid = remote.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
+ guid = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
self.touch(('file', 'blob'))
- remote.Context(guid).upload_blob('preview', 'file')
- self.assertEqual('blob', remote.Context(guid).get_blob('preview').read())
+ remote.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file'))
+ blob = remote.get(['context', guid, 'preview'], cmd='get_blob')
+ self.assertEqual('blob', file(blob['path']).read())
self.touch(('file2', 'blob2'))
- remote.Context(guid).upload_blob('preview', 'file2', pass_ownership=True)
- self.assertEqual('blob2', remote.Context(guid).get_blob('preview').read())
+ remote.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file2'), pass_ownership=True)
+ blob = remote.get(['context', guid, 'preview'], cmd='get_blob')
+ self.assertEqual('blob2', file(blob['path']).read())
assert not exists('file2')
- def test_GetAbsetnBLOB(self):
- self.touch('mnt/.sugar-network')
- self.touch(('mnt/node', 'node'))
- self.start_server()
- self.got_event.wait()
- client = Client(tests.tmpdir + '/mnt')
+ def test_GetAbsentBLOB(self):
+ mounts = self.start_server()
+ mounts[tests.tmpdir + '/mnt'].mounted.wait()
+ remote = IPCClient(mountpoint=tests.tmpdir + '/mnt')
- guid = client.Report(
- context='context',
- implementation='implementation',
- description='description').post()
+ guid = remote.post(['report'], {
+ 'context': 'context',
+ 'implementation': 'implementation',
+ 'description': 'description',
+ })
- path, mime_type = client.Report(guid).get_blob_path('data')
- self.assertEqual(None, path)
- self.assertEqual(True, client.Report(guid).get_blob('data').closed)
+ self.assertEqual(None, remote.get(['report', guid, 'data'], cmd='get_blob'))
def test_GetDefaultBLOB(self):
- self.touch('mnt/.sugar-network')
- self.touch(('mnt/node', 'node'))
- self.start_server()
- self.got_event.wait()
- client = Client(tests.tmpdir + '/mnt')
+ mounts = self.start_server()
+ mounts[tests.tmpdir + '/mnt'].mounted.wait()
+ remote = IPCClient(mountpoint=tests.tmpdir + '/mnt')
- guid = client.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
+ guid = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
- path, mime_type = client.Context(guid).get_blob_path('icon')
- assert path.endswith('missing.png')
- assert exists(path)
- self.assertEqual(False, client.Context(guid).get_blob('icon').closed)
+ blob = remote.get(['context', guid, 'icon'], cmd='get_blob')
+ assert blob['path'].endswith('missing.png')
+ assert exists(blob['path'])
def test_get_blob_ExtractImplementations(self):
Volume.RESOURCES = [
@@ -359,28 +340,27 @@ class NodeMountTest(tests.Test):
'sugar_network.resources.implementation',
]
- self.touch('mnt/.sugar-network')
- self.touch(('mnt/node', 'node'))
- self.start_server()
- self.got_event.wait()
- remote = Client(tests.tmpdir + '/mnt')
-
- guid = remote.Implementation(
- context='context',
- license=['GPLv3+'],
- version='1',
- date=0,
- stability='stable',
- notes='').post()
+ mounts = self.start_server()
+ mounts[tests.tmpdir + '/mnt'].mounted.wait()
+ remote = IPCClient(mountpoint=tests.tmpdir + '/mnt')
+
+ guid = remote.post(['implementation'], {
+ 'context': 'context',
+ 'license': 'GPLv3+',
+ 'version': '1',
+ 'date': 0,
+ 'stability': 'stable',
+ 'notes': '',
+ })
bundle = zipfile.ZipFile('bundle', 'w')
bundle.writestr('probe', 'probe')
bundle.close()
- remote.Implementation(guid).upload_blob('data', 'bundle')
+ remote.put(['implementation', guid, 'data'], cmd='upload_blob', path=abspath('bundle'))
- path, __ = remote.Implementation(guid).get_blob_path('data')
- self.assertEqual(abspath('cache/implementation/%s/%s/data' % (guid[:2], guid)), path)
- self.assertEqual('probe', file(join(path, 'probe')).read())
+ blob = remote.get(['implementation', guid, 'data'], cmd='get_blob')
+ self.assertEqual(abspath('cache/implementation/%s/%s/data' % (guid[:2], guid)), blob['path'])
+ self.assertEqual('probe', file(join(blob['path'], 'probe')).read())
if __name__ == '__main__':
diff --git a/tests/units/remote_mount.py b/tests/units/remote_mount.py
index b06e686..7ac5610 100755
--- a/tests/units/remote_mount.py
+++ b/tests/units/remote_mount.py
@@ -5,380 +5,348 @@ import os
import json
import socket
from cStringIO import StringIO
-from os.path import exists
+from os.path import exists, abspath
from __init__ import tests
from active_toolkit import sockets, coroutine
-from sugar_network import Client, ServerError
+from sugar_network import local
+from sugar_network.local.ipc_client import Router as IPCRouter
from sugar_network.local.mounts import RemoteMount
from sugar_network.local.mountset import Mountset
-from sugar_network.local.bus import IPCServer
from sugar_network.toolkit import sugar, http
from sugar_network.resources.user import User
from sugar_network.resources.context import Context
from sugar_network.resources.report import Report
from sugar_network.resources.volume import Volume
+from sugar_network import IPCClient
class RemoteMountTest(tests.Test):
def test_GetKeep(self):
self.start_ipc_and_restful_server()
+ remote = IPCClient(mountpoint='/')
- remote = Client('/')
+ guid = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
- guid = remote.Context(
- type='activity',
- title='remote',
- summary='summary',
- description='description').post()
-
- context = remote.Context(guid, ['keep', 'keep_impl'])
+ context = remote.get(['context', guid], reply=['keep', 'keep_impl'])
self.assertEqual(False, context['keep'])
self.assertEqual(0, context['keep_impl'])
+ cursor = remote.get(['context'], reply=['keep', 'keep_impl'])['result']
self.assertEqual(
[(guid, False, False)],
- [(i['guid'], i['keep'], i['keep_impl']) for i in remote.Context.cursor(reply=['keep', 'keep_impl'])])
+ [(i['guid'], i['keep'], i['keep_impl']) for i in cursor])
self.mounts.home_volume['context'].create(guid=guid, type='activity',
title={'en': 'local'}, summary={'en': 'summary'},
description={'en': 'description'}, keep=True, keep_impl=2,
user=[sugar.uid()])
- context = remote.Context(guid, ['keep', 'keep_impl'])
+ context = remote.get(['context', guid], reply=['keep', 'keep_impl'])
self.assertEqual(True, context['keep'])
self.assertEqual(2, context['keep_impl'])
+ cursor = remote.get(['context'], reply=['keep', 'keep_impl'])['result']
self.assertEqual(
[(guid, True, 2)],
- [(i['guid'], i['keep'], i['keep_impl']) for i in remote.Context.cursor(reply=['keep', 'keep_impl'])])
+ [(i['guid'], i['keep'], i['keep_impl']) for i in cursor])
def test_SetKeep(self):
self.start_ipc_and_restful_server()
+ remote = IPCClient(mountpoint='/')
+ local = IPCClient(mountpoint='~')
+
+ guid_1 = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'remote',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
+ guid_2 = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'remote-2',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
+
+ self.assertRaises(RuntimeError, local.get, ['context', guid_1])
+ self.assertRaises(RuntimeError, local.get, ['context', guid_2])
+
+ remote.put(['context', guid_1], {'keep': True})
- remote = Client('/')
- local = Client('~')
-
- guid_1 = remote.Context(
- type=['activity'],
- title='remote',
- summary='summary',
- description='description').post()
- guid_2 = remote.Context(
- type=['activity'],
- title='remote-2',
- summary='summary',
- description='description').post()
-
- self.assertRaises(ServerError, lambda: local.Context(guid_1, reply=['title'])['title'])
- self.assertRaises(ServerError, lambda: local.Context(guid_2, reply=['title'])['title'])
-
- remote.Context(guid_1, keep=True).post()
-
- cursor = local.Context.cursor(reply=['keep', 'keep_impl', 'title'])
self.assertEqual(
sorted([
- (guid_1, 'remote', True, 0),
+ {'guid': guid_1, 'title': 'remote', 'keep': True, 'keep_impl': 0},
]),
- sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor]))
- cursor = remote.Context.cursor(reply=['keep', 'keep_impl', 'title'])
+ sorted(local.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result']))
self.assertEqual(
sorted([
- (guid_1, 'remote', True, 0),
- (guid_2, 'remote-2', False, 0),
+ {'guid': guid_1, 'title': 'remote', 'keep': True, 'keep_impl': 0},
+ {'guid': guid_2, 'title': 'remote-2', 'keep': False, 'keep_impl': 0},
]),
- sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor]))
+ sorted(remote.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result']))
- remote.Context(guid_1, keep=False).post()
+ remote.put(['context', guid_1], {'keep': False})
- cursor = local.Context.cursor(reply=['keep', 'keep_impl', 'title'])
self.assertEqual(
sorted([
- (guid_1, 'remote', False, 0),
+ {'guid': guid_1, 'title': 'remote', 'keep': False, 'keep_impl': 0},
]),
- sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor]))
- cursor = remote.Context.cursor(reply=['keep', 'keep_impl', 'title'])
+ sorted(local.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result']))
self.assertEqual(
sorted([
- (guid_1, 'remote', False, 0),
- (guid_2, 'remote-2', False, 0),
+ {'guid': guid_1, 'title': 'remote', 'keep': False, 'keep_impl': 0},
+ {'guid': guid_2, 'title': 'remote-2', 'keep': False, 'keep_impl': 0},
]),
- sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor]))
+ sorted(remote.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result']))
- context = local.Context(guid_1)
- context['title'] = 'local'
- context.post()
- context = local.Context(guid_1, reply=['keep', 'keep_impl', 'title'])
- self.assertEqual('local', context['title'])
+ local.put(['context', guid_1], {'title': 'local'})
- remote.Context(guid_1, keep=True).post()
+ self.assertEqual(
+ {'title': 'local'},
+ local.get(['context', guid_1], reply=['title']))
+
+ remote.put(['context', guid_1], {'keep': True})
- cursor = local.Context.cursor(reply=['keep', 'keep_impl', 'title'])
self.assertEqual(
sorted([
- (guid_1, 'local', True, 0),
+ {'guid': guid_1, 'title': 'local', 'keep': True, 'keep_impl': 0},
]),
- sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor]))
- cursor = remote.Context.cursor(reply=['keep', 'keep_impl', 'title'])
+ sorted(local.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result']))
self.assertEqual(
sorted([
- (guid_1, 'remote', True, 0),
- (guid_2, 'remote-2', False, 0),
+ {'guid': guid_1, 'title': 'remote', 'keep': True, 'keep_impl': 0},
+ {'guid': guid_2, 'title': 'remote-2', 'keep': False, 'keep_impl': 0},
]),
- sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor]))
+ sorted(remote.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result']))
def test_Subscription(self):
- self.fork(self.restful_server)
- coroutine.sleep(1)
-
- self.start_server()
- client = Client('/')
-
- subscription = sockets.SocketFile(coroutine.socket(socket.AF_UNIX))
- subscription.connect('run/subscribe')
- coroutine.sleep(1)
-
- guid = client.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
-
- self.assertEqual(
- {'mountpoint': '/', 'event': 'mount', 'name': 'Network', 'private': False},
- subscription.read_message())
- coroutine.select([subscription.fileno()], [], [])
- event = subscription.read_message()
- event.pop('props')
- event.pop('seqno')
- self.assertEqual(
- {'mountpoint': '/', 'document': 'context', 'event': 'create', 'guid': guid},
- event)
-
- client.Context(guid, title='new-title').post()
-
- coroutine.select([subscription.fileno()], [], [])
- event = subscription.read_message()
- event.pop('props')
- event.pop('seqno')
- self.assertEqual(
- {'mountpoint': '/', 'document': 'context', 'event': 'update', 'guid': guid},
- event)
-
- client.Context.delete(guid)
-
- coroutine.select([subscription.fileno()], [], [])
- event = subscription.read_message()
- event.pop('seqno')
- self.assertEqual(
- {'mountpoint': '/', 'document': 'context', 'event': 'delete', 'guid': guid},
- event)
-
- def test_Connect(self):
+ self.start_ipc_and_restful_server()
+ remote = IPCClient(mountpoint='/')
+ events = []
+
+ def read_events():
+ for event in remote.subscribe():
+ if 'props' in event:
+ event.pop('props')
+ events.append(event)
+ job = coroutine.spawn(read_events)
+
+ guid = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
+ coroutine.dispatch()
+ remote.put(['context', guid], {
+ 'title': 'title_2',
+ })
+ coroutine.dispatch()
+ remote.delete(['context', guid])
+ coroutine.sleep(.5)
+ job.kill()
+
+ self.assertEqual([
+ {'guid': guid, 'seqno': 2, 'document': 'context', 'event': 'create', 'mountpoint': '/'},
+ {'guid': guid, 'seqno': 3, 'document': 'context', 'event': 'update', 'mountpoint': '/'},
+ {'guid': guid, 'seqno': 4, 'event': 'delete', 'document': 'context', 'mountpoint': '/'},
+ ],
+ events)
+
+ def test_Subscription_NotifyOnline(self):
+ self.start_ipc_and_restful_server()
+ remote = IPCClient(mountpoint='/')
+ local = IPCClient(mountpoint='~')
+ events = []
+
+ guid = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
+
+ def read_events():
+ for event in remote.subscribe():
+ if 'props' in event:
+ event.pop('props')
+ events.append(event)
+
+ job = coroutine.spawn(read_events)
+ local.put(['context', guid], {'keep': False})
+ coroutine.sleep(.5)
+ job.kill()
+
+ self.assertEqual([
+ {'document': 'context', 'event': 'update', 'guid': guid, 'seqno': 1},
+ ],
+ events)
+
+ def test_Mount(self):
pid = self.fork(self.restful_server)
- volume = Volume('local', [User, Context])
- mounts = Mountset(volume)
- mounts['/'] = RemoteMount(volume)
- ipc_server = IPCServer(mounts)
- coroutine.spawn(ipc_server.serve_forever)
- client = Client('/')
- subscription = sockets.SocketFile(coroutine.socket(socket.AF_UNIX))
- subscription.connect('run/subscribe')
-
- mounts.open()
+ volume = Volume('local', [User, Context])
+ self.mounts = Mountset(volume)
+ self.mounts['/'] = RemoteMount(volume)
+ self.server = coroutine.WSGIServer(
+ ('localhost', local.ipc_port.value), IPCRouter(self.mounts))
+ coroutine.spawn(self.server.serve_forever)
coroutine.dispatch()
+ remote = IPCClient(mountpoint='/')
+
+ events = []
+ def read_events():
+ for event in remote.subscribe():
+ if 'props' in event:
+ event.pop('props')
+ events.append(event)
+ job = coroutine.spawn(read_events)
+
+ self.assertEqual(False, remote.get(cmd='mounted'))
+ self.mounts.open()
+ self.mounts['/'].mounted.wait()
+ coroutine.sleep(1)
- coroutine.select([subscription.fileno()], [], [])
- self.assertEqual(
- {'mountpoint': '/', 'event': 'mount', 'name': 'Network', 'private': False},
- subscription.read_message())
- self.assertEqual(True, client.connected)
+ self.assertEqual(True, remote.get(cmd='mounted'))
+ self.assertEqual([
+ {'mountpoint': '/', 'event': 'mount', 'name': 'Network', 'private': False},
+ ],
+ events)
+ del events[:]
self.waitpid(pid)
+ coroutine.sleep(1)
- coroutine.select([subscription.fileno()], [], [])
- self.assertEqual(
- {'mountpoint': '/', 'event': 'unmount', 'name': 'Network', 'private': False},
- subscription.read_message())
- self.assertEqual(False, client.connected)
+ self.assertEqual(False, remote.get(cmd='mounted'))
+ self.assertEqual([
+ {'mountpoint': '/', 'event': 'unmount', 'name': 'Network', 'private': False},
+ ],
+ events)
+ del events[:]
pid = self.fork(self.restful_server)
+ # Ping to trigger re-connection
+ self.assertEqual(False, remote.get(cmd='mounted'))
coroutine.sleep(1)
- self.assertEqual(False, client.connected)
- coroutine.select([subscription.fileno()], [], [])
- self.assertEqual(
- {'mountpoint': '/', 'event': 'mount', 'name': 'Network', 'private': False},
- subscription.read_message())
- self.assertEqual(True, client.connected)
-
- self.waitpid(pid)
-
- coroutine.select([subscription.fileno()], [], [])
- self.assertEqual(
- {'mountpoint': '/', 'event': 'unmount', 'name': 'Network', 'private': False},
- subscription.read_message())
- self.assertEqual(False, client.connected)
+ self.assertEqual(True, remote.get(cmd='mounted'))
+ self.assertEqual([
+ {'mountpoint': '/', 'event': 'mount', 'name': 'Network', 'private': False},
+ ],
+ events)
+ del events[:]
def test_upload_blob(self):
self.start_ipc_and_restful_server()
- remote = Client('/')
+ remote = IPCClient(mountpoint='/')
- guid = remote.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
+ guid = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
self.touch(('file', 'blob'))
- remote.Context(guid).upload_blob('preview', 'file')
- self.assertEqual('blob', remote.Context(guid).get_blob('preview').read())
+ remote.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file'))
+ blob = remote.get(['context', guid, 'preview'], cmd='get_blob')
+ self.assertEqual('blob', file(blob['path']).read())
self.touch(('file2', 'blob2'))
- remote.Context(guid).upload_blob('preview', 'file2', pass_ownership=True)
- self.assertEqual('blob2', remote.Context(guid).get_blob('preview').read())
+ remote.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file2'), pass_ownership=True)
+ blob = remote.get(['context', guid, 'preview'], cmd='get_blob')
+ self.assertEqual('blob2', file(blob['path']).read())
assert not exists('file2')
- def test_StaleBLOBs(self):
- self.start_ipc_and_restful_server()
- remote = Client('/')
-
- guid = remote.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
-
- http.request('PUT', ['context', guid, 'preview'], files={'file': StringIO('blob-1')})
- self.assertEqual('blob-1', remote.Context(guid).get_blob('preview').read())
-
- cache_path = 'cache/context/%s/%s/preview' % (guid[:2], guid)
- self.touch((cache_path, 'blob-2'))
- self.assertEqual('blob-2', remote.Context(guid).get_blob('preview').read())
- self.assertEqual(3, json.load(file(cache_path + '.meta'))['seqno'])
-
- http.request('PUT', ['context', guid, 'preview'], files={'file': StringIO('blob-3')})
- self.assertEqual('blob-3', remote.Context(guid).get_blob('preview').read())
- self.assertEqual(4, json.load(file(cache_path + '.meta'))['seqno'])
-
- def test_DoNotStaleBLOBs(self):
- self.start_ipc_and_restful_server()
- remote = Client('/')
-
- guid = http.request('POST', ['context'],
- headers={'Content-Type': 'application/json'},
- data={
- 'type': 'activity',
- 'title': 'title',
- 'summary': 'summary',
- 'description': 'description',
- })
-
- http.request('PUT', ['context', guid, 'preview'], files={'file': StringIO('blob')})
- self.assertEqual('blob', remote.Context(guid).get_blob('preview').read())
-
- cache_path = 'cache/context/%s/%s/preview' % (guid[:2], guid)
- self.assertEqual(3, json.load(file(cache_path + '.meta'))['seqno'])
-
- # Shift seqno
- connected = coroutine.Event()
- self.mounts.connect(lambda event: connected.set(), event='create', mountpoint='/')
- http.request('POST', ['context'],
- headers={'Content-Type': 'application/json'},
- data={
- 'type': 'activity',
- 'title': 'title2',
- 'summary': 'summary2',
- 'description': 'description2',
- })
- connected.wait()
-
- self.assertEqual('blob', remote.Context(guid).get_blob('preview').read())
- self.assertEqual(4, json.load(file(cache_path + '.meta'))['seqno'])
-
def test_GetAbsentBLOB(self):
self.start_ipc_and_restful_server([User, Report])
- client = Client('/')
+ remote = IPCClient(mountpoint='/')
- guid = client.Report(
- context='context',
- implementation='implementation',
- description='description').post()
+ guid = remote.post(['report'], {
+ 'context': 'context',
+ 'implementation': 'implementation',
+ 'description': 'description',
+ })
- path, mime_type = client.Report(guid).get_blob_path('data')
- self.assertEqual(None, path)
- self.assertEqual(True, client.Report(guid).get_blob('data').closed)
+ self.assertEqual(None, remote.get(['report', guid, 'data'], cmd='get_blob'))
def test_GetDefaultBLOB(self):
self.start_ipc_and_restful_server()
- client = Client('/')
+ remote = IPCClient(mountpoint='/')
- guid = client.Context(
- type='activity',
- title='title',
- summary='summary',
- description='description').post()
+ guid = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
- path, mime_type = client.Context(guid).get_blob_path('icon')
- assert exists(path)
- self.assertEqual(False, client.Context(guid).get_blob('icon').closed)
+ blob = remote.get(['context', guid, 'icon'], cmd='get_blob')
+ assert not blob['path'].endswith('missing.png')
+ assert exists(blob['path'])
+ assert file(blob['path'], 'rb').read() == file('../../../sugar_network/static/images/missing.png', 'rb').read()
- def test_Localize(self):
- os.environ['LANG'] = 'en_US'
+ def test_StaleBLOBs(self):
self.start_ipc_and_restful_server()
- client = Client('/')
+ remote = IPCClient(mountpoint='/')
- guid = client.Context(
- type='activity',
- title='title_en',
- summary='summary_en',
- description='description_en').post()
+ guid = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
- res = client.Context(guid, ['title', 'summary', 'description'])
- self.assertEqual('title_en', res['title'])
- self.assertEqual('summary_en', res['summary'])
- self.assertEqual('description_en', res['description'])
+ self.touch(('file', 'blob-1'))
+ remote.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file'))
+ blob = remote.get(['context', guid, 'preview'], cmd='get_blob')
+ self.assertEqual('blob-1', file(blob['path']).read())
- self.stop_servers()
- os.environ['LANG'] = 'ru_RU'
- self.start_ipc_and_restful_server()
- client = Client('/')
+ cache_path = 'cache/context/%s/%s/preview' % (guid[:2], guid)
+ self.touch((cache_path, 'blob-2'))
+ blob = remote.get(['context', guid, 'preview'], cmd='get_blob')
+ self.assertEqual('blob-2', file(blob['path']).read())
+ self.assertEqual(3, json.load(file(cache_path + '.meta'))['seqno'])
- res = client.Context(guid, ['title', 'summary', 'description'])
- self.assertEqual('title_en', res['title'])
- self.assertEqual('summary_en', res['summary'])
- self.assertEqual('description_en', res['description'])
+ self.touch(('file', 'blob-3'))
+ remote.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file'))
+ blob = remote.get(['context', guid, 'preview'], cmd='get_blob')
+ self.assertEqual('blob-3', file(blob['path']).read())
+ self.assertEqual(4, json.load(file(cache_path + '.meta'))['seqno'])
- res['title'] = 'title_ru'
- res['summary'] = 'summary_ru'
- res['description'] = 'description_ru'
- res.post()
+ def test_DoNotStaleBLOBs(self):
+ self.start_ipc_and_restful_server()
+ remote = IPCClient(mountpoint='/')
- res = client.Context(guid, ['title', 'summary', 'description'])
- self.assertEqual('title_ru', res['title'])
- self.assertEqual('summary_ru', res['summary'])
- self.assertEqual('description_ru', res['description'])
+ guid = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
- self.stop_servers()
- os.environ['LANG'] = 'es_ES'
- self.start_ipc_and_restful_server()
- client = Client('/')
+ self.touch(('file', 'blob-1'))
+ remote.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file'))
+ blob = remote.get(['context', guid, 'preview'], cmd='get_blob')
+ self.assertEqual('blob-1', file(blob['path']).read())
- res = client.Context(guid, ['title', 'summary', 'description'])
- self.assertEqual('title_en', res['title'])
- self.assertEqual('summary_en', res['summary'])
- self.assertEqual('description_en', res['description'])
+ cache_path = 'cache/context/%s/%s/preview' % (guid[:2], guid)
+ self.touch((cache_path, 'blob-2'))
+ self.assertEqual(3, json.load(file(cache_path + '.meta'))['seqno'])
- self.stop_servers()
- os.environ['LANG'] = 'ru_RU'
- self.start_ipc_and_restful_server()
- client = Client('/')
+ # Shift seqno
+ remote.put(['context', guid], {'title': 'title-2'})
+ coroutine.sleep(1)
- res = client.Context(guid, ['title', 'summary', 'description'])
- self.assertEqual('title_ru', res['title'])
- self.assertEqual('summary_ru', res['summary'])
- self.assertEqual('description_ru', res['description'])
+ blob = remote.get(['context', guid, 'preview'], cmd='get_blob')
+ self.assertEqual('blob-2', file(blob['path']).read())
+ self.assertEqual(4, json.load(file(cache_path + '.meta'))['seqno'])
if __name__ == '__main__':
diff --git a/tests/units/router.py b/tests/units/router.py
index b2daef8..e9908c2 100755
--- a/tests/units/router.py
+++ b/tests/units/router.py
@@ -13,7 +13,7 @@ from __init__ import tests
import active_document as ad
from sugar_network import node
-from sugar_network.node.router import Router, _Request, _parse_accept_language
+from sugar_network.toolkit.router import Router, _Request, _parse_accept_language, Unauthorized
from active_toolkit import util
from sugar_network.resources.volume import Volume
@@ -203,11 +203,11 @@ class RouterTest(tests.Test):
'PATH_INFO': '/foo',
'REQUEST_METHOD': 'GET',
})
- self.assertRaises(node.Unauthorized, router._authenticate, request)
+ self.assertRaises(Unauthorized, router.authenticate, request)
request.environ['HTTP_SUGAR_USER'] = tests.UID
request.environ['HTTP_SUGAR_USER_SIGNATURE'] = tests.sign(tests.PRIVKEY, tests.UID)
- user = router._authenticate(request)
+ user = router.authenticate(request)
self.assertEqual(tests.UID, user)
def test_Authorization(self):
@@ -226,8 +226,8 @@ class RouterTest(tests.Test):
guid = rest.post('///document//', {'term': 'probe'})
self.assertEqual(
- {'term': 'probe'},
- rest.get('///document///%s///' % guid, reply='term'))
+ 'probe',
+ rest.get('///document///%s///' % guid, reply='term').get('term'))
def test_HandleRedirects(self):
URL = 'http://sugarlabs.org'
@@ -271,6 +271,15 @@ class RouterTest(tests.Test):
class Document(ad.Document):
+ @ad.active_property(prefix='RU', typecast=[], default=[],
+ permissions=ad.ACCESS_CREATE | ad.ACCESS_READ)
+ def user(self, value):
+ return value
+
+ @ad.active_property(prefix='L', typecast=[], default=['public'])
+ def layer(self, value):
+ return value
+
@ad.active_property(slot=1, prefix='A', full_text=True)
def term(self, value):
return value
@@ -294,6 +303,10 @@ class Document(ad.Document):
class User(ad.Document):
+ @ad.active_property(prefix='L', typecast=[], default=['public'])
+ def layer(self, value):
+ return value
+
@ad.active_property(ad.StoredProperty)
def pubkey(self, value):
return value
diff --git a/tests/units/subscribe_socket.py b/tests/units/subscribe_socket.py
deleted file mode 100755
index f63329b..0000000
--- a/tests/units/subscribe_socket.py
+++ /dev/null
@@ -1,152 +0,0 @@
-#!/usr/bin/env python
-# sugar-lint: disable
-
-import json
-import hashlib
-import tempfile
-
-from gevent import socket
-
-from __init__ import tests
-
-import active_document as ad
-from sugar_network import node
-from active_toolkit import sockets, coroutine, util
-
-
-class SubscribeSocketTest(tests.Test):
-
- def subscribe(self, host, port, ticket):
- conn = coroutine.socket()
- conn.connect((host, port))
- result = sockets.SocketFile(conn)
- result.write_message({'ticket': ticket})
- coroutine.sleep(1)
- return result
-
- def test_Subscribe(self):
- node.only_commit_events.value = False
-
- self.fork(self.restful_server, [User, Document])
- rest = tests.Request('http://localhost:8800')
-
- with self.subscribe(**rest.post('/', None, cmd='subscribe')) as subscription:
- guid = rest.post('/document', {'prop': 'value'})
- self.assertEqual(
- {'layer': ['public']},
- rest.get('/document/' + guid, reply=['layer']))
-
- event = subscription.read_message()
- event.pop('props')
- event.pop('seqno')
- self.assertEqual({
- 'guid': guid,
- 'document': 'document',
- 'event': 'create',
- },
- event)
-
- rest.put('/document/' + guid, {'prop': 'value2'})
- self.assertEqual(
- {'prop': 'value2'},
- rest.get('/document/' + guid, reply=['prop']))
-
- event = subscription.read_message()
- event.pop('props')
- event.pop('seqno')
- self.assertEqual({
- 'event': 'update',
- 'document': 'document',
- 'guid': guid,
- },
- event)
-
- rest.delete('/document/' + guid)
- self.assertRaises(RuntimeError, rest.get, '/document/' + guid)
- event = subscription.read_message()
- event.pop('seqno')
- self.assertEqual({
- 'event': 'delete',
- 'document': 'document',
- 'guid': guid,
- },
- event)
-
- self.assertRaises(socket.timeout, socket.wait_read, subscription.fileno(), 1)
-
- def test_OnlySyncEvents(self):
- node.only_commit_events.value = True
-
- self.fork(self.restful_server, [User, Document])
- rest = tests.Request('http://localhost:8800')
-
- with self.subscribe(**rest.post('/', None, cmd='subscribe')) as subscription:
- guid = rest.post('/document', {'prop': 'value'})
-
- self.assertEqual({
- 'document': 'document',
- 'event': 'commit',
- 'seqno': 2,
- },
- subscription.read_message())
-
- rest.put('/document/' + guid, {'prop': 'value2'})
-
- self.assertEqual({
- 'document': 'document',
- 'event': 'commit',
- 'seqno': 3,
- },
- subscription.read_message())
-
- rest.delete('/document/' + guid)
-
- self.assertEqual({
- 'document': 'document',
- 'event': 'commit',
- 'seqno': 4,
- },
- subscription.read_message())
-
- self.assertRaises(socket.timeout, socket.wait_read, subscription.fileno(), 1)
-
-
-class Document(ad.Document):
-
- @ad.active_property(slot=1, prefix='A', default='')
- def prop(self, value):
- return value
-
- @ad.active_property(ad.StoredProperty, default='')
- def author(self, value):
- return value
-
-
-class User(ad.Document):
-
- @ad.active_property(ad.StoredProperty)
- def pubkey(self, value):
- return value
-
- @ad.active_property(ad.StoredProperty, default='')
- def name(self, value):
- return value
-
- @classmethod
- def before_create(cls, props):
- ssh_pubkey = props['pubkey'].split()[1]
- props['guid'] = str(hashlib.sha1(ssh_pubkey).hexdigest())
-
- with tempfile.NamedTemporaryFile() as tmp_pubkey:
- tmp_pubkey.file.write(props['pubkey'])
- tmp_pubkey.file.flush()
-
- pubkey_pkcs8 = util.assert_call(
- ['ssh-keygen', '-f', tmp_pubkey.name, '-e', '-m', 'PKCS8'])
- props['pubkey'] = pubkey_pkcs8
-
- super(User, cls).before_create(props)
-
-
-if __name__ == '__main__':
- tests.main()
diff --git a/tests/units/sync_master.py b/tests/units/sync_master.py
index ea2e6fb..ab24292 100755
--- a/tests/units/sync_master.py
+++ b/tests/units/sync_master.py
@@ -315,16 +315,12 @@ class SyncMasterTest(tests.Test):
self.assertEqual(None, packet.header.get('dst'))
self.assertEqual([
{'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime},
'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime},
'prop': {'value': '', 'mtime': os.stat('db/document/1/1/prop').st_mtime},
}},
{'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '2', 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/2/2/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/2/2/layer').st_mtime},
'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime},
@@ -410,8 +406,6 @@ class SyncMasterTest(tests.Test):
self.assertEqual(None, packet.header.get('dst'))
self.assertEqual([
{'cookie': {'sn_pull': [[2, None]]}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime},
'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime},
@@ -469,16 +463,12 @@ class SyncMasterTest(tests.Test):
self.assertEqual(None, packet.header.get('dst'))
self.assertEqual([
{'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime},
'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime},
'prop': {'value': '*' * CHUNK, 'mtime': os.stat('db/document/1/1/prop').st_mtime},
}},
{'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '2', 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/2/2/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/2/2/layer').st_mtime},
'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime},
@@ -525,8 +515,6 @@ class SyncMasterTest(tests.Test):
self.assertEqual(None, packet.header.get('dst'))
self.assertEqual([
{'cookie': {}, 'filename': 'master-1.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime},
'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime},
@@ -605,16 +593,12 @@ class SyncMasterTest(tests.Test):
self.assertEqual(None, packet.header.get('dst'))
self.assertEqual([
{'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime},
'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime},
'prop': {'value': '', 'mtime': os.stat('db/document/1/1/prop').st_mtime},
}},
{'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '2', 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/2/2/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/2/2/layer').st_mtime},
'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime},
diff --git a/tests/units/sync_node.py b/tests/units/sync_node.py
index 1e56158..93bdd82 100755
--- a/tests/units/sync_node.py
+++ b/tests/units/sync_node.py
@@ -40,16 +40,12 @@ class SyncNodeTest(tests.Test):
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-2.packet', 'cmd': 'sn_pull', 'src': 'node', 'dst': 'master', 'session': '1', 'sequence': [[1, None]]},
{'api_url': api_url.value, 'filename': 'node-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': '1', 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime},
'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime},
'prop': {'value': 'value1', 'mtime': os.stat('db/document/1/1/prop').st_mtime},
}},
{'api_url': api_url.value, 'filename': 'node-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '2', 'session': '1', 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/2/2/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/2/2/layer').st_mtime},
'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime},
@@ -67,8 +63,6 @@ class SyncNodeTest(tests.Test):
node.sync('mnt', session='session')
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-1.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': 'session', 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime},
'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime},
@@ -101,8 +95,6 @@ class SyncNodeTest(tests.Test):
self.assertEqual([[2, None]], kwargs['diff_sequence'])
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': 1, 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime},
'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime},
@@ -116,16 +108,12 @@ class SyncNodeTest(tests.Test):
self.assertEqual([[4, None]], kwargs['diff_sequence'])
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '2', 'session': 2, 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/2/2/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/2/2/layer').st_mtime},
'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime},
'prop': {'value': '*' * 1024, 'mtime': os.stat('db/document/2/2/prop').st_mtime},
}},
{'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '3', 'session': 2, 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/3/3/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/3/3/layer').st_mtime},
'guid': {'value': '3', 'mtime': os.stat('db/document/3/3/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/3/3/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/3/3/mtime').st_mtime},
@@ -139,24 +127,18 @@ class SyncNodeTest(tests.Test):
self.assertEqual(None, kwargs)
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '4', 'session': 3, 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/4/4/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/4/4/layer').st_mtime},
'guid': {'value': '4', 'mtime': os.stat('db/document/4/4/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/4/4/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/4/4/mtime').st_mtime},
'prop': {'value': '*' * 1024, 'mtime': os.stat('db/document/4/4/prop').st_mtime},
}},
{'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '5', 'session': 3, 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/5/5/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/5/5/layer').st_mtime},
'guid': {'value': '5', 'mtime': os.stat('db/document/5/5/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/5/5/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/5/5/mtime').st_mtime},
'prop': {'value': '*' * 1024, 'mtime': os.stat('db/document/5/5/prop').st_mtime},
}},
{'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '6', 'session': 3, 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/6/6/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/6/6/layer').st_mtime},
'guid': {'value': '6', 'mtime': os.stat('db/document/6/6/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/6/6/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/6/6/mtime').st_mtime},
@@ -226,8 +208,6 @@ class SyncNodeTest(tests.Test):
self.assertEqual([
{'filename': 'master.packet', 'content_type': 'records', 'src': 'master', 'dst': 'node', 'cmd': 'sn_ack', 'sequence': [[1, 2]], 'merged': []},
{'api_url': api_url.value, 'filename': 'node-4.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '3', 'session': 'session', 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/3/3/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/3/3/layer').st_mtime},
'guid': {'value': '3', 'mtime': os.stat('db/document/3/3/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/3/3/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/3/3/mtime').st_mtime},
@@ -329,8 +309,6 @@ class SyncNodeTest(tests.Test):
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-1.packet', 'cmd': 'sn_pull', 'src': 'node', 'dst': 'master', 'session': session, 'sequence': [[1, None]]},
{'api_url': api_url.value, 'filename': 'node-1.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': session, 'diff': {
- 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime},
- 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime},
'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime},
'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime},
'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime},
diff --git a/tests/units/volume.py b/tests/units/volume.py
index a635b82..1642fb9 100755
--- a/tests/units/volume.py
+++ b/tests/units/volume.py
@@ -1,12 +1,15 @@
#!/usr/bin/env python
# sugar-lint: disable
+import json
+
from __init__ import tests
import active_document as ad
from sugar_network.toolkit.collection import Sequence
from sugar_network.toolkit.sneakernet import InPacket, OutBufferPacket, DiskFull
-from sugar_network.resources.volume import Volume, Resource
+from sugar_network.resources.volume import Volume, Resource, Commands
+from active_toolkit import coroutine
class VolumeTest(tests.Test):
@@ -160,6 +163,96 @@ class VolumeTest(tests.Test):
],
events)
+ def test_Subscribe(self):
+
+ class Document(Resource):
+
+ @ad.active_property(slot=1)
+ def prop(self, value):
+ return value
+
+ volume = Volume('db', [Document])
+ cp = TestCommands(volume)
+ events = []
+
+ def read_events():
+ for event in cp.subscribe(ad.Response()):
+ assert event.startswith('data: ')
+ assert event.endswith('\n\n')
+ event = json.loads(event[6:])
+ if 'props' in event:
+ event.pop('props')
+ events.append(event)
+
+ job = coroutine.spawn(read_events)
+ coroutine.dispatch()
+ volume['document'].create(guid='guid', prop='value1')
+ coroutine.dispatch()
+ volume['document'].update('guid', prop='value2')
+ coroutine.dispatch()
+ volume['document'].delete('guid')
+ coroutine.dispatch()
+ volume['document'].commit()
+ coroutine.sleep(.5)
+ job.kill()
+
+ self.assertEqual([
+ {'guid': 'guid', 'seqno': 1, 'document': 'document', 'event': 'create'},
+ {'guid': 'guid', 'seqno': 2, 'document': 'document', 'event': 'update'},
+ {'guid': 'guid', 'event': 'delete', 'document': u'document'},
+ ],
+ events)
+
+ def test_SubscribeToOnlyCommits(self):
+
+ class Document(Resource):
+
+ @ad.active_property(slot=1)
+ def prop(self, value):
+ return value
+
+ volume = Volume('db', [Document])
+ cp = TestCommands(volume)
+ events = []
+
+ def read_events():
+ for event in cp.subscribe(ad.Response(), only_commits=True):
+ assert event.startswith('data: ')
+ assert event.endswith('\n\n')
+ event = json.loads(event[6:])
+ if 'props' in event:
+ event.pop('props')
+ events.append(event)
+
+ job = coroutine.spawn(read_events)
+ coroutine.dispatch()
+ volume['document'].create(guid='guid', prop='value1')
+ coroutine.dispatch()
+ volume['document'].update('guid', prop='value2')
+ coroutine.dispatch()
+ volume['document'].delete('guid')
+ coroutine.dispatch()
+ volume['document'].commit()
+ coroutine.sleep(.5)
+ job.kill()
+
+ self.assertEqual([
+ {'seqno': 1, 'document': 'document', 'event': 'commit'},
+ {'seqno': 2, 'document': 'document', 'event': 'commit'},
+ {'seqno': 2, 'document': 'document', 'event': 'commit'},
+ ],
+ events)
+
+
+class TestCommands(ad.VolumeCommands, Commands):
+
+ def __init__(self, volume):
+ ad.VolumeCommands.__init__(self, volume)
+ Commands.__init__(self)
+
+ def connect(self, callback, condition=None, **kwargs):
+ self.volume.connect(callback, condition)
+
def read_packet(packet):
result = []