Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/tests/basic_api_v2.txt
blob: b0c4a88c039853d2dbdb2fe405680129f84c385d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
>>> import os
>>> import tempfile
>>> import time

Define some helper functions
>>> def test_unique(items):
...     return not [True for e in items if items.count(e) > 1]
>>> def to_native(value):
...     if isinstance(value, list):
...         return [to_native(e) for e in value]
...     elif isinstance(value, dict):
...         return dict([(to_native(k), to_native(v)) for k, v in value.items()])
...     elif isinstance(value, unicode):
...         return unicode(value)
...     elif isinstance(value, str):
...         return str(value)
...     return value


Connect to datastore using DBus and wait for it to get ready:
>>> import dbus
>>> DS_DBUS_SERVICE = 'org.laptop.sugar.DataStore'
>>> DS_DBUS_INTERFACE = 'org.laptop.sugar.DataStore'
>>> DS_DBUS_PATH = '/org/laptop/sugar/DataStore'
>>> bus = dbus.SessionBus()
>>> ds = dbus.Interface(bus.get_object(DS_DBUS_SERVICE, DS_DBUS_PATH), DS_DBUS_INTERFACE)


Make sure we're starting from an empty datastore:
>>> assert ds.find({}, [], byte_arrays=True) == ([], 0)


Create something to play with:
>>> o1_uid = ds.create({'title': 'DS test object 1', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest1'}, '', False)
>>> assert isinstance(o1_uid, basestring)
>>> o2_uid = ds.create({'title': 'DS test object 2', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest2'}, '', False)
>>> assert isinstance(o2_uid, basestring)
>>> o3_uid = ds.create({'title': 'DS test object 3', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest2'}, '', False)
>>> assert isinstance(o3_uid, basestring)
>>> assert test_unique([o1_uid, o2_uid, o3_uid])


Check everything is there:
>>> sorted(ds.find({}, ['title', 'activity'], byte_arrays=True)[0])
[dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 1', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest1', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest2', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 3', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest2', variant_level=1)}, signature=dbus.Signature('sv'))]
>>> ds.get_filename(o1_uid, byte_arrays=True)
dbus.String(u'')
>>> ds.get_filename(o2_uid, byte_arrays=True)
dbus.String(u'')
>>> ds.get_filename(o3_uid, byte_arrays=True)
dbus.String(u'')



Test get_uniquevaluesfor().
>>> sorted(ds.get_uniquevaluesfor('activity', {}))
[dbus.String(u'org.sugarlabs.DataStoreTest1'), dbus.String(u'org.sugarlabs.DataStoreTest2')]


Change some entries:
>>> ds.update(o1_uid, {'title': 'DS test object 1 updated', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest1', 'tags': 'foo'}, '', False)
>>> ds.update(o2_uid, {'title': 'DS test object 2', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest1', 'tags': 'bar baz'}, '', False)
>>> ds.update(o3_uid, {'title': 'DS test object 2', 'mime_type': 'text/html', 'activity': 'org.sugarlabs.DataStoreTest3', 'timestamp': 10000}, '', False)
>>> sorted(to_native(ds.find({}, ['title', 'activity'], byte_arrays=True)[0]))
[{u'activity': 'org.sugarlabs.DataStoreTest1', u'title': 'DS test object 1 updated'}, {u'activity': 'org.sugarlabs.DataStoreTest1', u'title': 'DS test object 2'}, {u'activity': 'org.sugarlabs.DataStoreTest3', u'title': 'DS test object 2'}]

Retrieve metadata for a single entry, ignoring variable data:
>>> d=dict(ds.get_properties(o3_uid, byte_arrays=True))
>>> del d['uid'], d['timestamp'], d['creation_time']
>>> assert to_native(d) == {u'title': 'DS test object 2', u'mime_type': 'text/html', u'activity': 'org.sugarlabs.DataStoreTest3'}


Find entries using "known" metadata:
>>> sorted(ds.find({'mime_type': ['text/plain']}, ['title', 'activity', 'mime_type', 'tags'], byte_arrays=True)[0])
[dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest1', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/plain', variant_level=1), dbus.String(u'tags'): dbus.ByteArray('bar baz', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 1 updated', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest1', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/plain', variant_level=1), dbus.String(u'tags'): dbus.ByteArray('foo', variant_level=1)}, signature=dbus.Signature('sv'))]
>>> sorted(ds.find({'mime_type': ['text/html']}, ['title', 'activity', 'mime_type', 'tags'], byte_arrays=True)[0])
[dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/html', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest3', variant_level=1)}, signature=dbus.Signature('sv'))]
>>> sorted(ds.find({'uid': o3_uid}, ['title', 'activity', 'mime_type'], byte_arrays=True)[0])
[dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/html', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest3', variant_level=1)}, signature=dbus.Signature('sv'))]
>>> sorted(ds.find({'timestamp': (9000, 11000)}, ['title', 'activity', 'mime_type'], byte_arrays=True)[0])
[dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/html', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest3', variant_level=1)}, signature=dbus.Signature('sv'))]

Find entries using "unknown" metadata (=> returns all entries):
>>> sorted(ds.find({'title': 'DS test object 2'}, ['title', 'activity', 'mime_type', 'tags'], byte_arrays=True)[0])
[dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/html', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest3', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest1', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/plain', variant_level=1), dbus.String(u'tags'): dbus.ByteArray('bar baz', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 1 updated', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest1', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/plain', variant_level=1), dbus.String(u'tags'): dbus.ByteArray('foo', variant_level=1)}, signature=dbus.Signature('sv'))]

You can specify a (primary) sort order. Please note that the secondary sort order is undefined / implementation-dependent.
>>> to_native(ds.find({'order_by': ['+title']}, ['title', 'activity'], byte_arrays=True)[0])
[{u'activity': 'org.sugarlabs.DataStoreTest1', u'title': 'DS test object 2'}, {u'activity': 'org.sugarlabs.DataStoreTest3', u'title': 'DS test object 2'}, {u'activity': 'org.sugarlabs.DataStoreTest1', u'title': 'DS test object 1 updated'}]
>>> to_native(ds.find({'order_by': ['-title']}, ['title', 'activity'], byte_arrays=True)[0])
[{u'activity': 'org.sugarlabs.DataStoreTest1', u'title': 'DS test object 1 updated'}, {u'activity': 'org.sugarlabs.DataStoreTest1', u'title': 'DS test object 2'}, {u'activity': 'org.sugarlabs.DataStoreTest3', u'title': 'DS test object 2'}]

Delete an entry:
>>> ds.delete(o1_uid)
>>> sorted(ds.find({}, ['title', 'activity'], byte_arrays=True)[0])
[dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest1', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest3', variant_level=1)}, signature=dbus.Signature('sv'))]


Create an entry with content:
>>> dog_content = 'The quick brown dog jumped over the lazy fox.'
>>> dog_props = {'title': 'dog/fox story', 'mime_type': 'text/plain'}
>>> dog_file = tempfile.NamedTemporaryFile()
>>> dog_file.write(dog_content)
>>> dog_file.flush()
>>> dog_uid = ds.create(dog_props, dog_file.name, False)

Retrieve and verify the entry with content:
>>> dog_retrieved = ds.get_filename(dog_uid)
>>> assert(file(dog_retrieved).read() == dog_content)
>>> os.remove(dog_retrieved)

Update the entry content:
>>> dog_content = 'The quick brown fox jumped over the lazy dog.'
>>> dog_file.seek(0)
>>> dog_file.write(dog_content)
>>> dog_file.flush()
>>> ds.update(dog_uid, dog_props, dog_file.name, False)

Verify updated content:
>>> dog_retrieved = ds.get_filename(dog_uid)
>>> assert(file(dog_retrieved).read() == dog_content)
>>> os.remove(dog_retrieved)
>>> dog_file.close()