>>> import os >>> import tempfile >>> import time Define some helper functions >>> def test_unique(items) : ... return not [True for e in items if items.count(e) > 1] Connect to datastore using DBus and wait for it to get ready: >>> import dbus >>> DS_DBUS_SERVICE = "org.silbe.GDataStore" >>> DS_DBUS_INTERFACE = "org.silbe.GDataStore" >>> DS_DBUS_PATH = "/org/silbe/GDataStore" >>> bus = dbus.SessionBus() >>> ds = dbus.Interface(bus.get_object(DS_DBUS_SERVICE, DS_DBUS_PATH), DS_DBUS_INTERFACE) Make sure we're starting from an empty datastore: >>> assert ds.find({}, [], byte_arrays=True) == ([], 0) Create something to play with: >>> o1_uid = ds.create({'title': 'DS test object 1', 'mime_type': 'text/plain', 'activity': 'org.silbe.DataStoreTest1'}, '', False) >>> assert isinstance(o1_uid, basestring) >>> o2_uid = ds.create({'title': 'DS test object 2', 'mime_type': 'text/plain', 'activity': 'org.silbe.DataStoreTest2'}, '', False) >>> assert isinstance(o2_uid, basestring) >>> o3_uid = ds.create({'title': 'DS test object 3', 'mime_type': 'text/plain', 'activity': 'org.silbe.DataStoreTest2'}, '', False) >>> assert isinstance(o3_uid, basestring) >>> assert test_unique([o1_uid, o2_uid, o3_uid]) Check everything is there: >>> sorted(ds.find({}, ['title', 'activity'], byte_arrays=True)[0]) [dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 1', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest1', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest2', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 3', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest2', variant_level=1)}, signature=dbus.Signature('sv'))] >>> ds.get_filename(o1_uid, byte_arrays=True) dbus.String(u'') >>> ds.get_filename(o2_uid, byte_arrays=True) dbus.String(u'') >>> ds.get_filename(o3_uid, byte_arrays=True) dbus.String(u'') Test get_uniquevaluesfor(). >>> sorted(ds.get_uniquevaluesfor('activity', {})) [dbus.String(u'org.silbe.DataStoreTest1'), dbus.String(u'org.silbe.DataStoreTest2')] Change some entries: >>> ds.update(o1_uid, {'title': 'DS test object 1 updated', 'mime_type': 'text/plain', 'activity': 'org.silbe.DataStoreTest1', 'tags': 'foo'}, '', False) >>> ds.update(o2_uid, {'title': 'DS test object 2', 'mime_type': 'text/plain', 'activity': 'org.silbe.DataStoreTest1', 'tags': 'bar baz'}, '', False) >>> ds.update(o3_uid, {'title': 'DS test object 2', 'mime_type': 'text/html', 'activity': 'org.silbe.DataStoreTest3', 'timestamp': 10000}, '', False) >>> sorted(ds.find({}, ['title', 'activity'], byte_arrays=True)[0]) [dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 1 updated', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest1', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest1', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest3', variant_level=1)}, signature=dbus.Signature('sv'))] Retrieve metadata for a single entry, ignoring variable data: >>> d=dict(ds.get_properties(o3_uid, byte_arrays=True)) >>> del d['uid'], d['timestamp'] >>> d {dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/html', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest3', variant_level=1)} Find entries using "known" metadata: >>> sorted(ds.find({'mime_type': ['text/plain']}, ['title', 'activity', 'mime_type', 'tags'], byte_arrays=True)[0]) [dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest1', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/plain', variant_level=1), dbus.String(u'tags'): dbus.ByteArray('bar baz', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 1 updated', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest1', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/plain', variant_level=1), dbus.String(u'tags'): dbus.ByteArray('foo', variant_level=1)}, signature=dbus.Signature('sv'))] >>> sorted(ds.find({'mime_type': ['text/html']}, ['title', 'activity', 'mime_type', 'tags'], byte_arrays=True)[0]) [dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/html', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest3', variant_level=1)}, signature=dbus.Signature('sv'))] >>> sorted(ds.find({'uid': o3_uid}, ['title', 'activity', 'mime_type'], byte_arrays=True)[0]) [dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/html', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest3', variant_level=1)}, signature=dbus.Signature('sv'))] >>> sorted(ds.find({'timestamp': (9000, 11000)}, ['title', 'activity', 'mime_type'], byte_arrays=True)[0]) [dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/html', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest3', variant_level=1)}, signature=dbus.Signature('sv'))] Find entries using "unknown" metadata (=> returns all entries): >>> sorted(ds.find({'title': 'DS test object 2'}, ['title', 'activity', 'mime_type', 'tags'], byte_arrays=True)[0]) [dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/html', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest3', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest1', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/plain', variant_level=1), dbus.String(u'tags'): dbus.ByteArray('bar baz', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 1 updated', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest1', variant_level=1), dbus.String(u'mime_type'): dbus.ByteArray('text/plain', variant_level=1), dbus.String(u'tags'): dbus.ByteArray('foo', variant_level=1)}, signature=dbus.Signature('sv'))] Specify a sort order: >>> ds.find({'order_by': ['+title']}, ['title', 'activity'], byte_arrays=True)[0] dbus.Array([dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest1', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest3', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 1 updated', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest1', variant_level=1)}, signature=dbus.Signature('sv'))], signature=dbus.Signature('a{sv}')) >>> ds.find({'order_by': ['-title']}, ['title', 'activity'], byte_arrays=True)[0] dbus.Array([dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 1 updated', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest1', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest1', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest3', variant_level=1)}, signature=dbus.Signature('sv'))], signature=dbus.Signature('a{sv}')) Delete an entry: >>> ds.delete(o1_uid) >>> sorted(ds.find({}, ['title', 'activity'], byte_arrays=True)[0]) [dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest1', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.silbe.DataStoreTest3', variant_level=1)}, signature=dbus.Signature('sv'))] Create an entry with content: >>> dog_content = 'The quick brown dog jumped over the lazy fox.' >>> dog_props = {'title': 'dog/fox story', 'mime_type': 'text/plain'} >>> dog_file = tempfile.NamedTemporaryFile() >>> dog_file.write(dog_content) >>> dog_file.flush() >>> dog_uid = ds.create(dog_props, dog_file.name, False) Retrieve and verify the entry with content: >>> dog_retrieved = ds.get_filename(dog_uid) >>> assert(file(dog_retrieved).read() == dog_content) >>> os.remove(dog_retrieved) Update the entry content: >>> dog_content = 'The quick brown fox jumped over the lazy dog.' >>> dog_file.seek(0) >>> dog_file.write(dog_content) >>> dog_file.flush() >>> ds.update(dog_uid, dog_props, dog_file.name, False) Verify updated content: >>> dog_retrieved = ds.get_filename(dog_uid) >>> assert(file(dog_retrieved).read() == dog_content) >>> os.remove(dog_retrieved) >>> dog_file.close()