1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
#!/usr/bin/env python
# sugar-lint: disable
import os
import shutil
from os.path import exists
from __init__ import tests, src_root
from sugar_network import db, client
from sugar_network.client import IPCClient
from sugar_network.client.commands import ClientCommands
from sugar_network.client import IPCRouter
from sugar_network.resources.volume import Volume
from sugar_network.toolkit import mountpoints, coroutine
class ServerCommandsTest(tests.Test):
def start_node(self):
os.makedirs('disk/sugar-network')
self.node_volume = Volume('db')
cp = ClientCommands(self.node_volume)
trigger = self.wait_for_events(cp, event='inline', state='online')
coroutine.spawn(mountpoints.monitor, tests.tmpdir)
trigger.wait()
server = coroutine.WSGIServer(('localhost', client.ipc_port.value), IPCRouter(cp))
coroutine.spawn(server.serve_forever)
coroutine.dispatch()
return cp
def test_PopulateNode(self):
os.makedirs('disk/sugar-network')
volume = Volume('db')
cp = ClientCommands(volume)
assert not cp.inline()
trigger = self.wait_for_events(cp, event='inline', state='online')
mountpoints.populate('.')
assert trigger.value is not None
assert cp.inline()
def test_MountNode(self):
volume = Volume('db')
cp = ClientCommands(volume)
trigger = self.wait_for_events(cp, event='inline', state='online')
mountpoints.populate('.')
assert not cp.inline()
assert trigger.value is None
coroutine.spawn(mountpoints.monitor, '.')
coroutine.dispatch()
os.makedirs('disk/sugar-network')
trigger.wait()
assert cp.inline()
def test_UnmountNode(self):
cp = self.start_node()
assert cp.inline()
trigger = self.wait_for_events(cp, event='inline', state='offline')
shutil.rmtree('disk')
trigger.wait()
assert not cp.inline()
def test_whoami(self):
self.start_node()
ipc = IPCClient()
self.assertEqual(
{'guid': tests.UID, 'roles': []},
ipc.get(cmd='whoami'))
def test_subscribe(self):
self.start_node()
ipc = IPCClient()
events = []
def read_events():
for event in ipc.subscribe(event='!commit'):
if 'props' in event:
event.pop('props')
events.append(event)
job = coroutine.spawn(read_events)
coroutine.dispatch()
guid = ipc.post(['context'], {
'type': 'activity',
'title': 'title',
'summary': 'summary',
'description': 'description',
})
coroutine.dispatch()
ipc.put(['context', guid], {
'title': 'title_2',
})
coroutine.dispatch()
ipc.delete(['context', guid])
coroutine.sleep(.5)
job.kill()
self.assertEqual([
{'guid': guid, 'document': 'context', 'event': 'create'},
{'guid': guid, 'document': 'context', 'event': 'update'},
{'guid': guid, 'event': 'delete', 'document': 'context'},
],
events)
def test_BLOBs(self):
self.start_node()
ipc = IPCClient()
guid = ipc.post(['context'], {
'type': 'activity',
'title': 'title',
'summary': 'summary',
'description': 'description',
})
ipc.request('PUT', ['context', guid, 'preview'], 'image')
self.assertEqual(
'image',
ipc.request('GET', ['context', guid, 'preview']).content)
self.assertEqual(
{'preview': 'http://localhost:5555/context/%s/preview' % guid},
ipc.get(['context', guid], reply=['preview']))
self.assertEqual(
[{'preview': 'http://localhost:5555/context/%s/preview' % guid}],
ipc.get(['context'], reply=['preview'])['result'])
self.assertEqual(
file(src_root + '/sugar_network/static/httpdocs/images/missing.png').read(),
ipc.request('GET', ['context', guid, 'icon']).content)
self.assertEqual(
{'icon': 'http://localhost:5555/static/images/missing.png'},
ipc.get(['context', guid], reply=['icon']))
self.assertEqual(
[{'icon': 'http://localhost:5555/static/images/missing.png'}],
ipc.get(['context'], reply=['icon'])['result'])
if __name__ == '__main__':
tests.main()
|