diff options
-rwxr-xr-x | bin/datastore-service | 3 | ||||
-rw-r--r-- | bin/test-trial3.py | 202 | ||||
-rw-r--r-- | docs/requirements.txt | 110 | ||||
-rw-r--r-- | src/olpc/datastore/backingstore.py | 82 | ||||
-rw-r--r-- | src/olpc/datastore/bin_copy.py | 5 | ||||
-rw-r--r-- | src/olpc/datastore/config.py | 13 | ||||
-rw-r--r-- | src/olpc/datastore/converter.py | 8 | ||||
-rw-r--r-- | src/olpc/datastore/datastore.py | 239 | ||||
-rw-r--r-- | src/olpc/datastore/hg_backingstore.py | 426 | ||||
-rw-r--r-- | src/olpc/datastore/lru.py | 81 | ||||
-rw-r--r-- | src/olpc/datastore/model.py | 70 | ||||
-rw-r--r-- | src/olpc/datastore/sxattr.py | 63 | ||||
-rw-r--r-- | src/olpc/datastore/utils.py | 3 | ||||
-rw-r--r-- | src/olpc/datastore/xapianindex.py | 219 | ||||
-rw-r--r-- | tests/mountpoints.txt | 5 | ||||
-rw-r--r-- | tests/runalltests.py | 2 | ||||
-rw-r--r-- | tests/simple_versions.txt | 105 | ||||
-rw-r--r-- | tests/test_hgrepo.py | 115 | ||||
-rw-r--r-- | tests/test_model.py | 2 | ||||
-rw-r--r-- | tests/testutils.py | 18 | ||||
-rw-r--r-- | tests/xapianindex.txt | 2 |
21 files changed, 1617 insertions, 156 deletions
diff --git a/bin/datastore-service b/bin/datastore-service index c84e314..c2a29fa 100755 --- a/bin/datastore-service +++ b/bin/datastore-service @@ -3,7 +3,7 @@ import sys, os, signal import gobject import dbus.service import dbus.mainloop.glib -from olpc.datastore import DataStore, DS_LOG_CHANNEL, backingstore +from olpc.datastore import DataStore, DS_LOG_CHANNEL, backingstore, hg_backingstore import logging # Path handling @@ -48,6 +48,7 @@ bus = dbus.SessionBus() ds = DataStore() ds.registerBackend(backingstore.FileBackingStore) ds.registerBackend(backingstore.InplaceFileBackingStore) +ds.registerBackend(hg_backingstore.HgBackingStore) ds.mount(repo_dir) ds.complete_indexing() diff --git a/bin/test-trial3.py b/bin/test-trial3.py new file mode 100644 index 0000000..76594c8 --- /dev/null +++ b/bin/test-trial3.py @@ -0,0 +1,202 @@ +import dbus +import os +import shutil +import popen2 +import tempfile + +DEFAULT_STORE = "/tmp/store1" + +def _create_temp(content): + pt, pp = tempfile.mkstemp(suffix=".txt") + os.write(pt, content) + os.close(pt) + return pp + + +def _create_temp_odt(content): + # This just isn't working on my system + # first write the contents to a temp file, then convert it + pt, pp = tempfile.mkstemp(suffix=".txt") + os.write(pt, content) + os.close(pt) + + f, temp_path = tempfile.mkstemp(suffix='.odt') + del f + # my abiword didn't support fd://0 + cmd = 'abiword --to=odt --to-name=%s %s' % (temp_path, pp) + print cmd + child_stdout, child_stdin, child_stderr = popen2.popen3(cmd) + child_stdin.write(content) + child_stdin.close() + child_stdout.close() + child_stderr.close() + os.unlink(pp) + + return temp_path + +def start(): + if os.path.exists(DEFAULT_STORE): + os.system("rm -rf %s" % DEFAULT_STORE) + + bus = dbus.SessionBus() + bobj = bus.get_object("org.laptop.sugar.DataStore", + "/org/laptop/sugar/DataStore") + + + ds = dbus.Interface(bobj, dbus_interface='org.laptop.sugar.DataStore') + + mp = ds.mount("hg:%s" % DEFAULT_STORE, + dict(title="Primary Storage")) + return ds, mp + +def stop(ds): + ds.stop() + + + +ds, mp = start() +# Browse starts download +print "Download Simulation" +props = {'title': 'Downloading test.pdf from \nhttp://example.org/test.pdf.', + 'mime_type': 'application/pdf', + 'progress': '0', + 'mountpoint' : mp, + } +uid, vid = ds.checkin(props, '') +print 'Created download: %s %s' % (uid, vid) +assert uid +assert vid == '1' +ds.complete_indexing() + +# Browse notifies the DS about the progress +props['uid'] = uid +for i in range(1, 5): + props['progress'] = str(i * 20) + props['vid'] = vid + uid, vid = ds.checkin(props, '') + print 'Updated download: %s %s %s' % (uid, vid, props['progress']) + assert uid + ds.complete_indexing() + +# Browse finishes the download +# Now assume we have a file called tests/test.pdf (which there is if +# this is run from the project root) +# Checkin Gives the file to the datastore, it no longer owns it and +# will be removed when the datastore is done with it +source_path = "tests/test.pdf" +dest_path = "/tmp/test.pdf" +# we don't want it to kill our test file so we copy it to /tmp +shutil.copy(source_path, dest_path) + +props['title'] = 'File test.pdf downloaded from\nhttp://example.org/test.pdf.' +props['progress'] = '100' +uid, vid = ds.checkin(props, dest_path) +ds.complete_indexing() +print 'Updated download with file: %s %s %s' % (uid, vid, props['progress']) +assert uid +#assert vid == '1' + + +# Check the DS has removed the file. +assert not os.path.exists(dest_path) + +# Journal displays the download +objects, count = ds.find({'title': 'downloaded', 'order_by' : ['-vid']}) +props = objects[0] +print 'Query returned: %s' % props['uid'], count +assert props['vid'] == vid, """%s != %s""" % (props['vid'], vid) # the last rev + +# Read resumes the entry +props, file_path = ds.checkout(uid, '', '', '', '') +print 'Entry checked out: %s %s\n%s' % (uid, file_path, props) +assert props +assert props['vid'] == vid +assert file_path + +# Read saves position and zoom level +props['position'] = '15' +props['zoom_level'] = '150' +props['activity'] = 'org.laptop.sugar.Read' + + +uid, nvid = ds.checkin(props, file_path) +print 'Updated Read state: %s %s' % (uid, nvid) +assert uid +assert nvid != "1" +assert nvid > vid, "%s < %s" %(nvid, vid) +ds.complete_indexing() + + +print "DONE" +#stop(ds) + +# test_writing +#ds, mp = start() +print "Writing test" +# Create empty entry +props = {'title': 'Write activity', 'mountpoint' : mp} +uid, vid = ds.checkin(props, '') +print 'Created entry: %s %s' % (uid, vid) +assert uid +assert vid == '1' + +ds.complete_indexing() + +# First checkout +props, file_path = ds.checkout(uid, '', '', '', '') +print 'Entry checked out: %s %s' % (uid, file_path) +assert props +assert props['vid'] == '1' +assert file_path == '' + + +# Write first contents +file_path = _create_temp('blah blah 1') + +props['mountpoint'] = mp +props['mime_type'] = 'text/plain' +uid, vid = ds.checkin(props, file_path) +ds.complete_indexing() +print 'First checkin: %s %s %s' % (uid, vid, file_path) +assert uid +#assert vid == '1' +assert not os.path.exists(file_path) + +# Second checkout +props, file_path = ds.checkout(uid, '', '', '', '') +print 'Entry checked out: %s %s' % (props['uid'], file_path) +assert props +#assert props['vid'] == '1' +assert file_path + +# Write second contents +file_path = _create_temp('blah blah 1\nbleh bleh 2') +props['mime_type'] = 'text/plain' +props['mountpoint'] = mp +uid, vid = ds.checkin(props, file_path) +ds.complete_indexing() +print 'Second checkin: %s %s %s' % (uid, vid, file_path) +assert uid +#assert vid == '2' +assert not os.path.exists(file_path) + +print "DONE" +#stop(ds) + + +# PREVIEW test +#ds, mp = start() +print "preview test" +props = {'title': 'Write activity', + 'preview': dbus.ByteArray('\123\456\789\000\123'), + 'mountpoint' : mp} +uid, vid = ds.checkin(props, '') +print 'Created entry: %s %s' % (uid, vid) +assert uid +assert vid == '1' + +props, file_path = ds.checkout(uid, '', '', '', '') +print props + +print "DONE" +#stop(ds) diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..6b6b538 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,110 @@ +Datastore +========= + +An outline of the basic requirements and design decisions of the +Datastore used in Sugar. + + +Description +----------- + +The Datastore (DS) is a context storage and indexing tool. Its +designed to provide efficient access to content while supplying both +browse and search models on top of this content. It doesn't this +without demanding any notion of a content repository, location or a +data API from the consumer. Additionally the Datastore supports +recovery of older versions of content and supports both backups and +remote storage. + + +Constraints +------------ + +The system is designed to run in a hostile environment. Operation +runtime and space are both highly constrained and only limited CPU +time/power is available to complete tasks. + +1. Limited Memory +2. Limited Processing Power +3. Limited Storage +4. Attempt to limit number of writes to medium. +5. All interactions through the public interface should return as soon +as possible. There is a 10 second window available to DBus, failure to +return in this time causes errors. Failure to return in under 0.2 +seconds can result in the UI blocking. + +Point 5 seems a system design flaw to me. The need for atomic +I/O bound operations is at odds with a completely asycnronous model +and the shouldn't call into such a system in a way that would make it +block. + + + +Versioning/Storage +------------------ + +The datastore needs to remain efficient in terms of both space and +time while deliverying content in a reliable fashion. Additionally +because its designed to function in an environment where we hope to +minimize the number of writes.en + +1. Recovery of previous versions of documents +2. Efficient Storage + Should work with both normal text and binary data +3. Should allow the archival of old versions removing the need to +store the entire version history. +4. Should support synchronization with remote stores. + + +Searching/Indexing +------------------ + +The DS provide searchable metadata and content indexing for the vast +majority of content on the system. + + +1. Searches should reflect immediate operations immediately. (Even though +the operations happen asynchronously). + +2. Fulltext searching of content should be possible and +accessible. Even through historic versions. + +3. Fulltext should support stemming of common terms in a given +language. + + + +Archival/Backups +---------------- + +The system should provide a model for long term storage and backup +support. Old versions of content should be migrated to a long term +storage. The system should provide a way identify content no longer +needed by the runtime when: + 1. Connected to a remote store + 2. Connected to the school server + 3. Space is limited + 4. System is idle + +The system should identify content subject to backups. Begin a remote +transaction with the storage repo. Migrate the old versions over an +SSH connection and then remove the old versions and the index +information for them from the local store. + + + +Remote Repositories +------------------- + +The DS is capable of mouting additional stores and having them +function as a single unit. This can extend from USB devices to remote +network based storage (through the use of SSH). + +If the model is SSH based then the remote stores don't require a +active server runtime, but will have increased latency for common +operations as indexes must be loaded on a per-request basis. Counting +on the remote OS to cache and manage this is an option, a TTL based +server start is an option, a per-user or per-machine server is also +possible. + + diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py index 47e8214..0d04b4a 100644 --- a/src/olpc/datastore/backingstore.py +++ b/src/olpc/datastore/backingstore.py @@ -15,8 +15,6 @@ from datetime import datetime import gnomevfs import os import re -import sha -import subprocess import time import threading @@ -27,6 +25,18 @@ from olpc.datastore import utils # changing this pattern impacts _targetFile filename_attempt_pattern = re.compile('\(\d+\)$') +# capabilities +# list of strings on a backing store that indicate it provides an +# implementation of some feature another layer can expect +# "file" indicates that the store can return traditional filelike +# objects and is the baseline capability expected of most other layers +# "inplace" indicates that the backingstore will update/index the +# files as they reside on the store. Files will not be moved. +# "versions" indicates that the store will keep revisions of content +# internally. This has implications at the search layer which may +# perform different operations on stores that exhibit this capability + + class BackingStore(object): """Backing stores manage stable storage. We abstract out the management of file/blob storage through this class, as well as the @@ -39,6 +49,8 @@ class BackingStore(object): olpc.datastore.model are provided. """ + capabilities = () + def __init__(self, uri, **kwargs): """The kwargs are used to configure the backend so it can provide its interface. See specific backends for details @@ -127,6 +139,8 @@ class FileBackingStore(BackingStore): STORE_NAME = "store" INDEX_NAME = "index" DESCRIPTOR_NAME = "metainfo" + + capabilities = ("file") def __init__(self, uri, **kwargs): """ FileSystemStore(path=<root of managed storage>) @@ -308,10 +322,9 @@ class FileBackingStore(BackingStore): break targetpath = "%s(%s)%s" % (targetpath, attempt, ext) - - if subprocess.call(['cp', path, targetpath]): - raise OSError("unable to create working copy") - return open(targetpath, 'rw') + + bin_copy.bin_copy(path, targetpath) + return targetpath def _mapContent(self, uid, fp, path, env=None): """map a content object and the file in the repository to a @@ -329,17 +342,11 @@ class FileBackingStore(BackingStore): targetfile = self._targetFile(uid, target, ext, env) content.file = targetfile - if self.options.get('verify', False): - c = sha.sha() - for line in targetfile: - c.update(line) - fp.seek(0) - if c.hexdigest() != content.checksum: - raise ValueError("Content for %s corrupt" % uid) return content def _writeContent(self, uid, filelike, replace=True, target=None): content = None + if not filelike: return if target: path = target else: path = self._translatePath(uid) @@ -348,27 +355,12 @@ class FileBackingStore(BackingStore): raise KeyError("objects with path:%s for uid:%s exists" %( path, uid)) - if filelike.name != path: - # protection on inplace stores - bin_copy.bin_copy(filelike.name, path) - - def _checksum(self, filename): - c = sha.sha() - fp = open(filename, 'r') - for line in fp: - c.update(line) - fp.close() - return c.hexdigest() + bin_copy.bin_copy(filelike, path) # File Management API def create(self, props, filelike): uid = self.indexmanager.index(props, filelike) - if filelike: - if isinstance(filelike, basestring): - # lets treat it as a filename - filelike = open(filelike, "r") - filelike.seek(0) - self._writeContent(uid, filelike, replace=False) + self._writeContent(uid, filelike, replace=False) return uid def get(self, uid, env=None, allowMissing=False, includeFile=False): @@ -387,15 +379,6 @@ class FileBackingStore(BackingStore): if 'uid' not in props: props['uid'] = uid self.indexmanager.index(props, filelike) - filename = filelike - if filelike: - if isinstance(filelike, basestring): - # lets treat it as a filename - filelike = open(filelike, "r") - filelike.seek(0) - self.set(uid, filelike) - - def set(self, uid, filelike): self._writeContent(uid, filelike) def delete(self, uid, allowMissing=True): @@ -433,7 +416,8 @@ class InplaceFileBackingStore(FileBackingStore): """ STORE_NAME = ".olpc.store" - + capabilities = ("file", "inplace") + def __init__(self, uri, **kwargs): # remove the 'inplace:' scheme uri = uri[len('inplace:'):] @@ -485,7 +469,8 @@ class InplaceFileBackingStore(FileBackingStore): source = os.path.join(dirpath, fn) relative = source[len(self.uri)+1:] - + source = os.path.abspath(source) + result, count = self.indexmanager.search(dict(filename=relative)) mime_type = gnomevfs.get_mime_type(source) stat = os.stat(source) @@ -507,7 +492,7 @@ class InplaceFileBackingStore(FileBackingStore): # happen) content = result.next() uid = content.id - saved_mtime = content.get_property('mtime') + saved_mtime = content.get_property('mtime', None) if mtime != saved_mtime: self.update(uid, metadata, source) self.indexmanager.flush() @@ -518,13 +503,6 @@ class InplaceFileBackingStore(FileBackingStore): except KeyError: return None return os.path.join(self.uri, content.get_property('filename', uid)) -## def _targetFile(self, uid, target=None, ext=None, env=None): -## # in this case the file should really be there unless it was -## # deleted in place or something which we typically isn't -## # allowed -## # XXX: catch this case and remove the index -## targetpath = self._translatePath(uid) -## return open(targetpath, 'rw') # File Management API def create(self, props, filelike): @@ -532,17 +510,13 @@ class InplaceFileBackingStore(FileBackingStore): # don't touch it proposed_name = None if filelike: - if isinstance(filelike, basestring): - # lets treat it as a filename - filelike = open(filelike, "r") - filelike.seek(0) # usually with USB drives and the like the file we are # indexing is already on it, however in the case of moving # files to these devices we need to detect this case and # place the file proposed_name = props.get('filename', None) if not proposed_name: - proposed_name = os.path.split(filelike.name)[1] + proposed_name = os.path.split(filelike)[1] # record the name before qualifying it to the store props['filename'] = proposed_name proposed_name = os.path.join(self.uri, proposed_name) diff --git a/src/olpc/datastore/bin_copy.py b/src/olpc/datastore/bin_copy.py index 1be1b6b..81beadb 100644 --- a/src/olpc/datastore/bin_copy.py +++ b/src/olpc/datastore/bin_copy.py @@ -3,6 +3,11 @@ import os, subprocess def bin_copy(src, dest, mode=0600): try: + if not isinstance(src, basestring): + import pdb;pdb.set_trace() + if not isinstance(dest, basestring): + import pdb;pdb.set_trace() + subprocess.check_call(['/bin/cp', src, dest]) except subprocess.CalledProcessError: raise OSError("Copy failed %s %s" % (src, dest)) diff --git a/src/olpc/datastore/config.py b/src/olpc/datastore/config.py new file mode 100644 index 0000000..5a3483f --- /dev/null +++ b/src/olpc/datastore/config.py @@ -0,0 +1,13 @@ + +# the name used by the logger +DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore' + +# DBus config +DS_SERVICE = "org.laptop.sugar.DataStore" +DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore" +DS_OBJECT_PATH = "/org/laptop/sugar/DataStore" + + +# XAttr namespace +XATTR_NAMESPACE = "datastore" + diff --git a/src/olpc/datastore/converter.py b/src/olpc/datastore/converter.py index 75f7568..9f32a24 100644 --- a/src/olpc/datastore/converter.py +++ b/src/olpc/datastore/converter.py @@ -64,6 +64,7 @@ class subprocessconverter(object): def __call__(self, filename): data = {} + data['source'] = filename if self.require_target: # XXX: methods that return something bad here @@ -90,10 +91,15 @@ class subprocessconverter(object): # reading if os.path.exists(target): os.unlink(target) - + class noop(object): def verify(self): return True def __call__(self, filename): + return open(filename, 'r') + +class decoder(object): + def verify(self): return True + def __call__(self, filename): return codecs.open(filename, 'r', 'utf-8') class Converter(object): diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py index 19342e2..0a37f49 100644 --- a/src/olpc/datastore/datastore.py +++ b/src/olpc/datastore/datastore.py @@ -17,13 +17,10 @@ import dbus.service import dbus.mainloop.glib from olpc.datastore import utils +from olpc.datastore import lru +from olpc.datastore.sxattr import Xattr +from olpc.datastore.config import * -# the name used by the logger -DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore' - -DS_SERVICE = "org.laptop.sugar.DataStore" -DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore" -DS_OBJECT_PATH = "/org/laptop/sugar/DataStore" logger = logging.getLogger(DS_LOG_CHANNEL) @@ -36,6 +33,9 @@ class DataStore(dbus.service.Object): self.backends = [] self.mountpoints = {} self.root = None + + # maps uids to the mountpoint of the tip revision + self._mpcache = lru.LRU(20) # global handle to the main look dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) @@ -56,7 +56,6 @@ class DataStore(dbus.service.Object): self.backends.append(backendClass) ## MountPoint API - #@utils.sanitize_dbus @dbus.service.method(DS_DBUS_INTERFACE, in_signature="sa{sv}", out_signature='s') @@ -98,7 +97,6 @@ class DataStore(dbus.service.Object): """ return [mp.descriptor() for mp in self.mountpoints.itervalues()] - #@utils.sanitize_dbus @dbus.service.method(DS_DBUS_INTERFACE, in_signature="s", out_signature="") @@ -174,7 +172,7 @@ class DataStore(dbus.service.Object): def _resolveMountpoint(self, mountpoint=None): if isinstance(mountpoint, dict): mountpoint = mountpoint.pop('mountpoint', None) - + if mountpoint is not None: # this should be the id of a mount point mp = self.mountpoints[mountpoint] @@ -183,8 +181,175 @@ class DataStore(dbus.service.Object): mp = self.root return mp + + def _mountpointFor(self, uid): + # XXX: this is flawed in that we really need to resolve merge + # cases where objects exist in branches over two or more + # stores + # (and this have the same rev for different heads) + + # first, is it in the LRU cache? + if uid in self._mpcache: + return self._mpcache[uid] + + # attempt to resolve (and cache the mount point) + # this is the start of some merge code + on = [] + for mp in self.mountpoints.itervalues(): + try: + if "versions" in mp.capabilities: + c = mp.get(uid, allow_many=True) + if c: on.append((mp, c)) + else: + c = mp.get(uid) + if c: on.append((mp, c)) + except KeyError: + pass + + if on: + # find the single most recent revision + def versionCmp(x, y): + mpa, ca = x # mp, content + mpb, cb = y + # first by revision + r = cmp(int(ca.get_property('vid')), + int(cb.get_property('vid'))) + if r != 0: return r + # if they have the same revision we've detected a + # branch + # we should resolve the common parent in a merge case, + # etc. + # XXX: here we defer to time + return cmp(ca.get_property('mtime', 0), cb.get_property('mtime', 0)) + + + if len(on) > 1: + on.sort(versionCmp) + # the first mount point should be the most recent + # revision + mp = on[0][0] + else: + # No store has this uid. Doesn't mean it doesn't exist, + # just that its not mounted + mp = None + + self._mpcache[uid] = mp + return mp + # PUBLIC API - #@utils.sanitize_dbus + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='a{sv}s', + out_signature='ss') + def checkin(self, props, filelike=None): + """Check in a new content object. When uid is included in the + properties this is considered an update to the content object + which automatically creates a new revision. + + This method returns the uid and version id tuple. + """ + mp = None + # dbus cleaning + props = utils._convert(props) + + if filelike is not None: + filelike = str(filelike) + if filelike: + # attempt to scan the xattr namespace for information that can + # allow us to process this request more quickly + x = Xattr(filelike, XATTR_NAMESPACE) + known = x.asDict() + if "mountpoint" not in props and "mountpoint" in known: + mp = self._resolveMountpoint(known['mountpoint']) + + if not mp: + mp = self._resolveMountpoint(props) + + if "versions" not in mp.capabilities: + vid = "1" # we don't care about vid on + # non-versioning stores + uid = props.get('uid') + if uid: + # this is an update operation + # and uid refers to the single object + # so an update is ok + mp.update(uid, props, filelike) + else: + # this is a create operation + uid = mp.create(props, filelike) + else: + # use the unified checkin on the backingstore + uid, vid = mp.checkin(props, filelike) + + self._mpcache[uid] = mp + return uid, str(vid) + + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='sssss', + out_signature='a{sv}s') + def checkout(self, uid, vid=None, mountpoint=None, target=None, dir=None): + """Check out a revision of a document. Returns the properties + of that version and a filename with the contents of that + version. Generally calls to this should have the mountpoint + supplied as their may be different repositories with unmerged + histories of the same object. + """ + ## XXX: review this when repo merge code is in place + + # dbus cleanup + uid = str(uid) + vid = vid and str(vid) or None + mountpoint = mountpoint and str(mountpoint) or None + target = target and str(target) or None + dir = dir and str(dir) or None + + mp = self._mountpointFor(uid) + if not mp: + raise KeyError("Object with %s uid not found in datastore" % uid) + + if "versions" not in mp.capabilities: + content = mp.get(uid) + props = content.properties + props['mountpoint'] = content.backingstore.id + filename = content.filename + return props, filename + else: + return mp.checkout(uid, vid, target=target, dir=dir) + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='ssss', + out_signature='a{sv}s') + def copy(self, uid, vid=None, mountpoint=None, target_mountpoint=None): + # attempt to copy an object from one mount to another + # if the uid/vid pair is not found a KeyError is raised + # if the mount points don't exist a KeyError is raised + # if both stores support versioning then the whole revision + # history is copied, if not this behaves as a checkout on the + # store + mp = self._resolveMountpoint(mountpoint) + mp2 = self._resolveMountpoint(target_mountpoint) + if not mp:raise KeyError("No mount %s" % mountpoint) + if not mp2:raise KeyError("No mount %s" % target_mountpoint) + + vs = "versions" in mp.capabilities + vt = "versions" in mp2.capabilities + content = mp.get(uid) + props = content.properties + + if not vs or not vt: + del props['uid'] + filename = content.filename + uid = mp2.create(props, filename) + return uid, '1' + else: + # in this case we can copy the whole version history + mp2.raw_copy(mp.raw_sources()) + # this creates an extra checkout, but as long as its not + # an inplace store it should be cleaner on index + mp2.update(props, content.filename) + + + # OLD API @dbus.service.method(DS_DBUS_INTERFACE, in_signature='a{sv}s', out_signature='s') @@ -228,13 +393,12 @@ class DataStore(dbus.service.Object): for hit in res: existing = d.get(hit.id) if not existing or \ - existing.get_property('mtime') < hit.get_property('mtime'): + existing.get_property('vid') < hit.get_property('vid'): # XXX: age/version check d[hit.id] = hit return d, len(d) - #@utils.sanitize_dbus @dbus.service.method(DS_DBUS_INTERFACE, in_signature='a{sv}', out_signature='aa{sv}u') @@ -330,7 +494,9 @@ class DataStore(dbus.service.Object): for r in results: props = {} props.update(r.properties) - + + # on versioning stores uid will be different + # than r.id but must be set if 'uid' not in props: props['uid'] = r.id @@ -351,11 +517,11 @@ class DataStore(dbus.service.Object): return (d, len(results)) - def get(self, uid): - mp = self._resolveMountpoint() + def get(self, uid, rev=None, mountpoint=None): + mp = self._resolveMountpoint(mountpoint) c = None try: - c = mp.get(uid) + c = mp.get(uid, rev) if c: return c except KeyError: pass @@ -363,29 +529,36 @@ class DataStore(dbus.service.Object): if not c: for mp in self.mountpoints.itervalues(): try: - c = mp.get(uid) + if "versions" in mp.capabilities: + c = mp.get(uid, rev) + else: + c = mp.get(uid) + if c: break except KeyError: continue return c - #@utils.sanitize_dbus @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', + in_signature='sss', out_signature='s') - def get_filename(self, uid): - content = self.get(uid) + def get_filename(self, uid, vid=None, mountpoint=None): + vid = vid and str(vid) or None + mountpoint = mountpoint and str(mountpoint) or None + content = self.get(uid, vid, mountpoint) if content: try: return content.filename except AttributeError: pass return '' - #@utils.sanitize_dbus @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', + in_signature='sss', out_signature='a{sv}') - def get_properties(self, uid): - content = self.get(uid) + def get_properties(self, uid, vid=None, mountpoint=None): + vid = vid and str(vid) or None + mountpoint = mountpoint and str(mountpoint) or None + + content = self.get(uid, vid, mountpoint) props = content.properties props['mountpoint'] = content.backingstore.id return props @@ -407,7 +580,6 @@ class DataStore(dbus.service.Object): return results - #@utils.sanitize_dbus @dbus.service.method(DS_DBUS_INTERFACE, in_signature='sa{sv}s', out_signature='') @@ -426,22 +598,27 @@ class DataStore(dbus.service.Object): @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") def Updated(self, uid): pass - #@utils.sanitize_dbus @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', + in_signature='ss', out_signature='') - def delete(self, uid): - content = self.get(uid) + def delete(self, uid, mountpoint=None): + content = self.get(uid, mountpoint=mountpoint) if content: content.backingstore.delete(uid) self.Deleted(uid) logger.debug("deleted %s" % uid) + @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") def Deleted(self, uid): pass + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='', + out_signature='') def stop(self): - """shutdown the service""" + """shutdown the service. this is intended only for automated + testing or system shutdown.""" self.Stopped() self._connection.get_connection()._unregister_object_path(DS_OBJECT_PATH) for mp in self.mountpoints.values(): mp.stop() diff --git a/src/olpc/datastore/hg_backingstore.py b/src/olpc/datastore/hg_backingstore.py new file mode 100644 index 0000000..7a4d1ba --- /dev/null +++ b/src/olpc/datastore/hg_backingstore.py @@ -0,0 +1,426 @@ +from olpc.datastore.backingstore import FileBackingStore +from olpc.datastore.sxattr import Xattr +from olpc.datastore.utils import create_uid +from olpc.datastore.bin_copy import bin_copy +from olpc.datastore.config import XATTR_NAMESPACE + +from mercurial import repo, filelog, transaction, util, revlog +import os, sys, tempfile + + +class localizedFilelog(filelog.filelog): + def __init__(self, opener, path, cwd=None): + self._fullpath = os.path.realpath(cwd) + revlog.revlog.__init__(self, opener, + self.encodedir(path + ".i")) + @property + def rindexfile(self): return os.path.join(self._fullpath, self.indexfile) + + @property + def rdatafile(self): return os.path.join(self._fullpath, self.datafile) + +class FileRepo(repo.repository): + """A very simple file repository that functions without the need + for global changesets or a working copy. + """ + + def __init__(self, path): + self.basepath = path + self.root = os.path.realpath(self.basepath) + self.repo = "data" + self.datadir = os.path.join(self.basepath, ".ds") + if not os.path.exists(self.basepath): + os.makedirs(self.basepath) + + def file(self, path): + eopen = util.encodedopener(util.opener(self.datadir), util.encodefilename) + fl = localizedFilelog(eopen, path, cwd=self.datadir) + return fl + + + def __contains__(self, path): + """Is this path already managed by the repo?""" + f = self.file(path) + p = f.rindexfile + return os.path.exists(p) + + def raw_copy(self, filenames): + # create localized repository versions of the raw data from + # another repo + # this doesn't do indexing or anything like that + for fn in filenames: + srcdir, n = os.path.split(fn) + target = self._rjoin(n) + bin_copy(fn, target) + + + def raw_sources(self): + # return list of filenames which must be copied + return [d for d in self._rdata() if d] + + def _rjoin(self, path): + # repository path + return os.path.join(self.basepath, ".ds", path) + + def _rdata(self, path): + """return the index and datafile names in the repository""" + f = self.file(path) + base = os.path.join(self.basepath, ".ds") + i = os.path.join(base, f.rindexfile) + d = os.path.join(base, f.rdatafile) + return (i and i or None, d and d or None) + + def put(self, path, source, parent=None, text=None, meta=None): + """Create a new revision of the content indicated by + 'path'. 'source' is the filename containing the data we wish + to commit. parent when present is the parent revision this + data comes from. When parent is not provided we first look at + the source file for the xattr 'user.datastore.revision', if + that is present we assume it is the parent revision of the + element in path. When absent we assume that this is a delta to + the tip. + + + @return rev, parent, parent is tip, changeset + - rev is this files new revision number + - parent is the id of the revision used as the parent + - parent_is_tip indicates if the parent was the most recent + head + - changeset is the random uid of this transaction used in + merges + """ + + # we don't have a real working copy model. The source file may + # exist in some path or location well outside the repo (and + # most likely does) + f = self.file(path) + data = open(source, 'r').read() + tip = f.tip() + if not parent: + x = Xattr(source, XATTR_NAMESPACE) + # attempt to resolve the revision number from the property + parent = x.get('revision') + expected_uid = x.get('uid') + + if parent: + parent = int(parent) # from unicode + else: + # there wasn't an attribute on the file + # this can happen if the file was created by the + # client or + # the filesystem didn't support xattr + # in this case we assume the revision is the tip + parent = tip + + if isinstance(parent, int): + # its a revision number, resolve it to a node id + try: + parent = f.node(parent) + except IndexError: + # XXX: log a warning, the parent passed in is + # invalid + # or... could have been archived I suppose + parent = None + if parent and not f.cmp(parent, data): + # they are the same + return + + # assume some default values for missing metadata + # changeset is important here. Files don't properly belong to + # change sets, but this uid is used to discriminate versions + # with identical revision numbers from different sources + changeset = create_uid() + if not meta: meta = {} + meta.setdefault('text', text and text or "automatic commit") + meta['changeset'] = changeset + + # commit the data to the log + t = self.transaction() + rev = f.count() + 1 + f.add(data, meta, t, rev, parent) + t.close() + + return rev, parent, parent == tip, changeset + + def transaction(self): + return transaction.transaction(sys.stderr.write, open, "journal") + + + def tip(self, path): + # return the revision id that is the tip for a given path + l = self.file(path) + return l.rev(l.tip()) + + def revision(self, path, rev): + """Given the path name return the data associated with the raw + revision""" + # we really need a less memory intensive way of doing this + # stream the data to stable media as it processes the delta + if path not in self: raise KeyError("%s is not managed by repo" % path) + l = self.file(path) + if isinstance(rev, int): + n = l.node(rev) + else: + n = rev + return l.read(n) + + def dump(self, path, rev, target, mountpoint_id, changeset): + """Dump the contents of a revision to the filename indicated + by target""" + fp = open(target, "w") + fp.write(self.revision(path, rev)) + fp.close() + # tag the checkout with its current revision this is used to + # aid in parent chaining on commits this is equivalent to the + # file_rev property in the model. + # XXX: need to check for when xattr is not supported by the fs better + x = Xattr(target, XATTR_NAMESPACE) + x['revision'] = str(rev) + x['uid'] = path # this is from the repo where the names are + # uids + x['mountpoint'] = mountpoint_id # to quickly recover the mountpoint + # this came from + x['changeset'] = changeset + + def remove(self, path): + """Hard remove the whole version history of an object""" + i, d = self._rdata(path) + if i and os.path.exists(i): + os.unlink(i) + # for small files d will not exist as the data is inlined to + # the the index + if d and os.path.exists(d): + os.unlink(d) + + def strip(self, path, rev): + """attempt to remove a given revision from the history of + path""" + f = self.file(path) + f.strip(rev, rev) + + +class HgBackingStore(FileBackingStore): + """This backingstore for the datastore supports versioning by + keeping a barebones Mercurial repository under the hood + """ + capabilities = ("file", "versions") + + def __init__(self, uri, **kwargs): + # the internal handle to the HgRepo + self.repo = None + uri = uri[len('hg:'):] + super(HgBackingStore, self).__init__(uri, **kwargs) + + @staticmethod + def parse(uri): + return uri.startswith("hg:") + + def check(self): + if not os.path.exists(self.uri): return False + if not os.path.exists(self.base): return False + return True + + def initialize(self): + super(FileBackingStore, self).initialize() + self.repo = FileRepo(self.base) + + def load(self): + super(HgBackingStore, self).load() + # connect the repo + if not self.repo: + self.repo = FileRepo(self.base) + + + def tip(self, uid): + return self.repo.tip(uid) + + # File Management API + def create(self, props, filelike): + # generate the uid ourselves. we do this so we can track the + # uid and changeset info + # Add it to the index + uid = create_uid() + props['uid'] = uid + props.setdefault('message', 'initial') + uid, rev = self.checkin(props, filelike) + return uid + + def get(self, uid, rev=None, env=None, allow_many=False): + # we have a whole version chain, but get contracts to + # return a single entry. In this case we default to 'tip' + if not rev: + rev = self.repo.tip(uid) + results, count = self.indexmanager.get_by_uid_prop(uid, rev) + if count == 0: + raise KeyError(uid) + elif count == 1 or allow_many: + return results.next() + + raise ValueError("Got %d results for 'get' operation on %s" %(count, uid)) + +###################################################################### +# XXX: This whole policy is botched unless the journal grows an +# # interface to display other versions of the main document +# # which it doesn't have. If I do this then we don't see those +# # versions in the journal and have no means to access +# # them. For the time being we just index everything and let +# # date sort them out. +###################################################################### +# # recover the old records for this uid +# # update may rewrite/remove 1-n documents +# # depending on the number of heads and so on +# # this needs to be done under a single transaction +# # actions will be a set of commands passed to +# # xapianindex.enque +# # the loop will process the entire action set at once +# +# # we need to pass head/tip tags from the parent to the child +# # as a result of the update +# # XXX: currently we are only going to index the HEADs and TIP +# # revisions in the repository (and those marked with KEEP). +# # This means that when we update +# # with these tags: +# # we can remove the old version from xapian +# # w/o these tags: +# # it gets a head tag, implying a branch +# # +# # because the keep flag indicates content is needed to be kept +# # locally we have two real choices, either +# # move it forward with each revision +# # keep only the original tagged version +# # and index the new one as well (because it will have the +# # HEAD tag) +########################################################################## + def update(self, uid, props, filelike): + props['uid'] = uid + uid, rev = self.checkin(props, filelike) + return uid + + + def delete(self, uid, rev=None): + # delete the object at 'uid', when no rev is passed tip is + # removed + if rev is None: + rev = self.repo.tip(uid) + + c = self.get(uid, rev) + self.indexmanager.delete(c.id) + self.repo.strip(uid, rev) + + def _targetFile(self, uid, target=None, ext=None, env=None): + c = self.indexmanager.get(uid) + rev = int(c.get_property('vid')) + #rev -= 1 # adjust for 0 based counting + self.repo.dump(uid, rev, target, self.id, c.get_property('changeset')) + return open(target, 'rw') + + + def checkin(self, props, filelike): + """create or update the content object, creating a new + version""" + uid = props.get("uid") + c = None + if uid is None: + uid = create_uid() + props['vid'] = "1" + else: + # is there an existing object with this uid? + # XXX: if there isn't it should it be an error? + r, count = self.indexmanager.get_by_uid_prop(uid, + props.get('vid', 'tip')) + if count: + # XXX: if count > 1 case should be handled + c = r.next() + # copy the value forward + old_props = c.properties.copy() + old_props.update(props) + props = old_props + # except vid which we increment here + props['vid'] = str(int(props['vid']) + 1) + + props['uid'] = uid + if filelike: + message = props.setdefault('message', 'initial') + # if a vid is passed in treat that as a parent revision + # because this is an interaction with the repo however we + # need to resolve that versions file_rev as the parent for + # 'put' + # it maybe that it didn't previously have a file at all in + # which case we must pass None + + # where c is the content lookup from before + parent = None + try: + parent = c.get_property('file_rev', None) + except: pass + + rev, parent, isTip, changeset = self.repo.put(uid, filelike, + parent, message, + meta=dict(uid=uid)) + # the create case is pretty simple + # mark everything with defaults + props['changeset'] = changeset + props['file_rev'] = str(rev) + self.indexmanager.index(props, filelike) + return uid, props['vid'] + + def checkout(self, uid, vid=None, target=None, dir=None): + """checkout the object with this uid at vid (or HEAD if + None). Returns (props, filename)""" + # use the repo to drive the property search + f = self.repo.file(uid) + exists = f.count() > 0 + + if vid: + vid = f.node(int(vid) -1) # base 0 counting + else: + vid = f.tip() + rev = f.rev(vid) + # there will only be one thing with the changeset id of this + # 'f' + m = f._readmeta(vid) + changeset = m.get('changeset') + if changeset: + objs, count = self.indexmanager.search(dict(changeset=changeset)) + assert count == 1 + obj = objs.next() + elif not exists: + # There isn't any file content with this entry + objs = self.indexmanager.get_by_uid_prop(uid) + obj = objs[0].next() + + # we expect file content + if exists: + if not target: + target, ext = obj.suggestName() + if not target: + fd, fn = tempfile.mkstemp(suffix=ext, dir=dir) + target = fn + os.close(fd) + + if not target.startswith('/'): + if not dir: dir = "/tmp" + target = os.path.join(dir, target) + + if target: + self.repo.dump(uid, rev, target, self.id, changeset) + + if not target: target = "" + return obj.properties, target + +if __name__ == "__main__": + import rlcompleter2 + rlcompleter2.setup(verbose=0) + + TESTLOC = "/tmp/fltest" + os.system('rm -rf %s' % TESTLOC) + + c = FileRepo(TESTLOC) + + n = c.blit("foo", "this is a test") + m = c.blit("bar", "another test") + + + o = c.blit("foo", "\nanother line", mode="a") + + c.revisions("foo") diff --git a/src/olpc/datastore/lru.py b/src/olpc/datastore/lru.py new file mode 100644 index 0000000..6e0b6be --- /dev/null +++ b/src/olpc/datastore/lru.py @@ -0,0 +1,81 @@ +# LRU cache +# from http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/252524 + +class Node(object): + __slots__ = ['prev', 'next', 'me'] + def __init__(self, prev, me): + self.prev = prev + self.me = me + self.next = None + +class LRU: + """ + Implementation of a length-limited O(1) LRU queue. + Built for and used by PyPE: + http://pype.sourceforge.net + Copyright 2003 Josiah Carlson. + """ + def __init__(self, count, pairs=[]): + self.count = max(count, 1) + self.d = {} + self.first = None + self.last = None + for key, value in pairs: + self[key] = value + def __contains__(self, obj): + return obj in self.d + def __getitem__(self, obj): + a = self.d[obj].me + self[a[0]] = a[1] + return a[1] + def __setitem__(self, obj, val): + if obj in self.d: + del self[obj] + nobj = Node(self.last, (obj, val)) + if self.first is None: + self.first = nobj + if self.last: + self.last.next = nobj + self.last = nobj + self.d[obj] = nobj + if len(self.d) > self.count: + if self.first == self.last: + self.first = None + self.last = None + return + a = self.first + a.next.prev = None + self.first = a.next + a.next = None + del self.d[a.me[0]] + del a + def __delitem__(self, obj): + nobj = self.d[obj] + if nobj.prev: + nobj.prev.next = nobj.next + else: + self.first = nobj.next + if nobj.next: + nobj.next.prev = nobj.prev + else: + self.last = nobj.prev + del self.d[obj] + def __iter__(self): + cur = self.first + while cur != None: + cur2 = cur.next + yield cur.me[1] + cur = cur2 + def iteritems(self): + cur = self.first + while cur != None: + cur2 = cur.next + yield cur.me + cur = cur2 + def iterkeys(self): + return iter(self.d) + def itervalues(self): + for i,j in self.iteritems(): + yield j + def keys(self): + return self.d.keys() diff --git a/src/olpc/datastore/model.py b/src/olpc/datastore/model.py index 14bb4b9..1e13a13 100644 --- a/src/olpc/datastore/model.py +++ b/src/olpc/datastore/model.py @@ -13,10 +13,11 @@ __license__ = 'The GNU Public License V2+' import datetime import mimetypes import os +import re import time import warnings from olpc.datastore.utils import timeparse - +import dbus # XXX: Open issues # list properties - Contributors (a, b, c) @@ -27,6 +28,9 @@ from olpc.datastore.utils import timeparse propertyTypes = {} _marker = object() +# used in cases where we convert title to a filename +titleSanitizer = re.compile("[\W]+") + def registerPropertyType(kind, get, set, xapian_sort_type=None, defaults=None, for_xapian=None, from_xapain=None): propertyTypes[kind] = PropertyImpl(get, set, xapian_sort_type, @@ -169,7 +173,7 @@ class Model(object): # Properties we don't automatically include in properties dict -EXCLUDED_PROPERTIES = ['fulltext', ] +EXCLUDED_PROPERTIES = ['fulltext', 'changeset', 'file_rev'] class Content(object): """A light weight proxy around Xapian Documents from secore. @@ -183,7 +187,8 @@ class Content(object): self._backingstore = backingstore self._file = None self._model = model - + self._file = None + def __repr__(self): return "<%s %s>" %(self.__class__.__name__, self.properties) @@ -227,7 +232,9 @@ class Content(object): # checkout name filename = self.get_property('filename', None) ext = self.get_property('ext', '') - + title = self.get_property('title', '') + title = titleSanitizer.sub("_", title) + if filename: # some backingstores keep the full relative path filename = os.path.split(filename)[1] @@ -242,16 +249,23 @@ class Content(object): ext = mimetypes.guess_extension(mt) # .ksh is a strange ext for plain text if ext and ext == '.ksh': ext = '.txt' + if title: return title, ext if ext: return None, ext + + if title: return title, None + return None, None def get_file(self): - if not hasattr(self, "_file") or not self._file or \ - self._file.closed is True: + if not self._file or self._file.closed is True: target, ext = self.suggestName() try: targetfile = self.backingstore._targetFile(self.id, target, ext) - self._file = targetfile + if not targetfile: + self._file = None + return self._file + + self._file = open(targetfile, "rw") except OSError: self._file = None return self._file @@ -261,16 +275,21 @@ class Content(object): file = property(get_file, set_file) @property - def filename(self): return os.path.abspath(self.file.name) - + def filename(self): + if self.file: + return os.path.abspath(self.file.name) + return '' + @property def contents(self): return self.file.read() @property def backingstore(self): return self._backingstore - + + # id of this record (unique in index) @property - def id(self): return self._doc.id + def id(self): + return self._doc.id @property def data(self): return self._doc.data @@ -280,8 +299,15 @@ def noop(value): return value import re base64hack = re.compile("(\S{212})") -def base64enc(value): return ' '.join(base64hack.split(value.encode('base64'))) -def base64dec(value): return value.replace(' ', '').decode('base64') +def base64enc(value): + if isinstance(value, list): + # its a bytearray + value = ''.join((str(v) for v in value)) + import pdb;pdb.set_trace() + return ' '.join(base64hack.split(value.encode('base64'))) + +def base64dec(value): + return value.replace(' ', '').decode('base64') DATEFORMAT = "%Y-%m-%dT%H:%M:%S" def date2string(value): return value.replace(microsecond=0).isoformat() @@ -295,6 +321,7 @@ def encode_datetime(value): def decode_datetime(value): # convert a float to a local datetime + if not value: return None return datetime.datetime.fromtimestamp(float(value)).isoformat() def datedec(value, dateformat=DATEFORMAT): @@ -322,9 +349,10 @@ registerPropertyType('text', noop, noop, 'string', {'store' : True, 'collapse' : True, }) -registerPropertyType('binary', noop, noop, None, {'store' : True, - 'exact' : False, - 'sortable' : False}) +# Now the convention is to directly use DBus.ByteArray +registerPropertyType('binary', str, dbus.ByteArray, None, {'store' : True, + 'exact' : False, + 'sortable' : False}) registerPropertyType('int', str, int, 'float', {'store' : True, 'exact' : True, @@ -346,8 +374,16 @@ registerPropertyType('date', dateenc, datedec, 'float', {'store' : True, defaultModel = Model().addFields( ('fulltext', 'text'), + # unique to the object through out its history + ('uid', 'string'), # vid is version id - ('vid', 'number'), + # linear version id's don't imply direct lineage, only the relative merge order + ('vid', 'string'), + # unique token per change to help disambiguate for merges + ('changeset', 'string'), + ('file_rev', 'int'), # this is the revision of the file pointed to + # by this revision of the object + ('checksum', 'string'), ('filename', 'string'), ('ext', 'string'), # its possible we don't store a filename, but diff --git a/src/olpc/datastore/sxattr.py b/src/olpc/datastore/sxattr.py new file mode 100644 index 0000000..957395a --- /dev/null +++ b/src/olpc/datastore/sxattr.py @@ -0,0 +1,63 @@ +""" +simplified xattr +~~~~~~~~~~~~~~~~~~~~ +automatically manage prefixes into the xattr namespace + +""" + +__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>' +__docformat__ = 'restructuredtext' +__copyright__ = 'Copyright ObjectRealms, LLC, 2007' +__license__ = 'The GNU Public License V2+' + + +import xattr + +class Xattr(object): + def __init__(self, filename, prefix, implicitUser=True): + self.filename = filename + self.prefix=[] + if implicitUser: self.prefix.append('user') + self.prefix.append(prefix) + self.ns = '.'.join(self.prefix) + self.keyed = lambda x: '.'.join((self.ns, x)) + + def __getitem__(self, key): + v = xattr.getxattr(self.filename, self.keyed(key)) + return v.decode('utf-8') + + def __setitem__(self, key, value): + if isinstance(value, unicode): + value = value.encode("utf-8") + else: + value = str(value) + xattr.setxattr(self.filename, self.keyed(key), value) + + def __delitem__(self, key): + xattr.removexattr(self.filename, self.keyed(key)) + + def get(self, key, default=None): + try: + return self[key] + except IOError: + return default + + def iterkeys(self): + all = xattr.listxattr(self.filename) + for key in all: + if key.startswith(self.ns): + yield key[len(self.ns) + 1:] + + def keys(self): + return list(self.iterkeys()) + + def asDict(self): + d = {} + for k in self.iterkeys(): + d[k] = self[k] + return d + + def update(self, dict): + for k,v in dict.iteritems(): + self[k] = v + diff --git a/src/olpc/datastore/utils.py b/src/olpc/datastore/utils.py index 0505463..e14461c 100644 --- a/src/olpc/datastore/utils.py +++ b/src/olpc/datastore/utils.py @@ -92,6 +92,9 @@ def _convert(arg): d[str(_convert(k))] = _convert(v) return d + if isinstance(arg, dbus.ByteArray): + return arg + if isinstance(arg, dbus.Array): a = [] for item in arg: diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py index d653f0e..ceb7d56 100644 --- a/src/olpc/datastore/xapianindex.py +++ b/src/olpc/datastore/xapianindex.py @@ -16,6 +16,7 @@ __license__ = 'The GNU Public License V2+' from Queue import Queue, Empty import gc import logging +import os import re import sys import time @@ -24,6 +25,7 @@ import threading import warnings import secore +import xapian as _xapian # we need to modify the QueryParser from olpc.datastore import model from olpc.datastore.converter import converter @@ -137,7 +139,12 @@ class IndexManager(object): self.write_index.flush() self.read_index.reopen() - + def enqueSequence(self, commands): + """Takes a sequence of arugments to the normal enque function + and executes them under a single lock/flush cycle + """ + self.queue.put(commands) + def enque(self, uid, vid, doc, operation, filestuff=None): # here we implement the sync/async policy # we want to take create/update operations and @@ -159,7 +166,14 @@ class IndexManager(object): if not filestuff: # In this case we are done return - + elif operation is DELETE: + # sync deletes + with self._write_lock: + self.write_index.delete(uid) + logger.info("deleted content %s" % (uid,)) + self.flush() + return + self.queue.put((uid, vid, doc, operation, filestuff)) def indexThread(self): @@ -172,52 +186,81 @@ class IndexManager(object): # from the item in the repo as that will become our immutable # copy. Detect those cases and use the internal filename # property or backingstore._translatePath to get at it + versions = self.versions + inplace = self.inplace + q = self.queue while self.indexer_running: # include timeout here to ease shutdown of the thread # if this is a non-issue we can simply allow it to block try: - data = self.queue.get(True, 0.025) - uid, vid, doc, operation, filestuff = data + # XXX: on shutdown there is a race where the queue is + # joined while this get blocks, the exception seems + # harmless though + data = q.get(True, 0.025) + # when we enque a sequence of commands they happen + # under a single write lock pass through the loop and + # the changes become visible at once. + + if not isinstance(data[0], (list, tuple)): + data = (data,) except Empty: - #time.sleep(1.0) continue try: with self._write_lock: - if operation is DELETE: - self.write_index.delete(uid) - logger.info("deleted content %s" % (uid,)) - elif operation is UPDATE: - # Here we handle the conversion of binary - # documents to plain text for indexing. This is - # done in the thread to keep things async and - # latency lower. - # we know that there is filestuff or it - # wouldn't have been queued - filename, mimetype = filestuff - if isinstance(filename, file): - filename = filename.name - fp = converter(filename, mimetype) - if fp: - # read in at a fixed block size, try to - # conserve memory. If this doesn't work - # we can make doc.fields a generator - while True: - chunk = fp.read(2048) - if not chunk: break - doc.fields.append(secore.Field('fulltext', chunk)) + for item in data: + uid, vid, doc, operation, filestuff = item + if operation is DELETE: + self.write_index.delete(uid) + logger.info("deleted content %s" % (uid,)) + elif operation is UPDATE: + # Here we handle the conversion of binary + # documents to plain text for indexing. This is + # done in the thread to keep things async and + # latency lower. + # we know that there is filestuff or it + # wouldn't have been queued + filename, mimetype = filestuff + if isinstance(filename, file): + filename = filename.name + if filename and not os.path.exists(filename): + # someone removed the file before + # indexing + # or the path is broken + logger.warning("Expected file for" + " indexing at %s. Not" + " Found" % filename) - self.write_index.replace(doc) - logger.info("update file content %s:%s" % (uid, vid)) - else: - logger.warning("""Conversion process failed for document %s %s""" % (uid, filename)) - else: - logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation)) + fp = converter(filename, mimetype=mimetype) + if fp: + # fixed size doesn't make sense, we + # shouldn't be adding fulltext unless + # it converted down to plain text in + # the first place + + while True: + chunk = fp.read(2048) + if not chunk: break + doc.fields.append(secore.Field('fulltext', chunk)) + self.write_index.replace(doc) + + if versions and not inplace: + # we know the source file is ours + # to remove + os.unlink(filename) + + logger.info("update file content %s:%s" % (uid, vid)) + else: + logger.debug("""Conversion process failed for document %s %s""" % (uid, filename)) + else: + logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation)) + # tell the queue its complete self.queue.task_done() - # we do flush on each record now + # we do flush on each record (or set for enque + # sequences) now self.flush() except: logger.exception("Error in indexer") @@ -277,33 +320,62 @@ class IndexManager(object): return d + @property + def versions(self): + if self.backingstore: + return "versions" in self.backingstore.capabilities + return False + + @property + def inplace(self): + if self.backingstore: + return "inplace" in self.backingstore.capabilities + return False + + def index(self, props, filename=None): """Index the content of an object. Props must contain the following: key -> Property() """ operation = UPDATE + # # Version handling # - # we implicitly create new versions of documents the version - # id should have been set by the higher level system + # are we doing any special handling for versions? uid = props.pop('uid', None) - vid = props.pop('vid', None) + if not uid: uid = create_uid() operation = CREATE - if vid: vid = str(float(vid) + 1.0) - else: vid = "1.0" # Property mapping via model - props = self._mapProperties(props) doc = secore.UnprocessedDocument() add = doc.fields.append - fp = None + vid = None + if self.versions: + vid = props.get("vid") + if not vid: + logger.warn("Didn't supply needed versioning information" + " on a backingstore which performs versioning") + # each versions id is unique when using a versioning store + doc.id = create_uid() + else: + doc.id = uid + + if not vid: vid = '1' + + + # on non-versioning stores this is redundant but on versioning + # stores it reference to the objects whole timeline + props['uid'] = uid + props['vid'] = vid + + props = self._mapProperties(props) filestuff = None if filename: @@ -312,11 +384,11 @@ class IndexManager(object): # and open fp? mimetype = props.get("mime_type") mimetype = mimetype and mimetype.value or 'text/plain' + + filename = os.path.abspath(filename) filestuff = (filename, mimetype) - doc.id = uid - add(secore.Field('vid', vid)) - + # # Property indexing for k, prop in props.iteritems(): @@ -332,13 +404,29 @@ class IndexManager(object): # queue the document for processing self.enque(uid, vid, doc, operation, filestuff) - return uid + return doc.id def get(self, uid): doc = self.read_index.get_document(uid) if not doc: raise KeyError(uid) return model.Content(doc, self.backingstore, self.datamodel) + def get_by_uid_prop(self, uid, rev=None): + # versioning stores fetch objects by uid + # when rev is passed only that particular rev is returne + ri = self.read_index + q = ri.query_field('uid', uid) + if rev is not None: + if rev == "tip": + rev = self.backingstore.tip(uid) + + q = ri.query_filter(q, ri.query_field('vid', str(rev))) + results, count = self._search(q, 0, 1000, sortby="-vid") + + return results, count + + + def delete(self, uid): # does this need queuing? # the higher level abstractions have to handle interaction @@ -391,13 +479,16 @@ class IndexManager(object): q = ri.query_composite(ri.OP_AND, queries) else: q = self.parse_query(query) - - results = ri.search(q, start_index, end_index) + + return self._search(q, start_index, end_index) + + def _search(self, q, start_index, end_index, sortby=None): + results = self.read_index.search(q, start_index, end_index, sortby=sortby) count = results.matches_estimated # map the result set to model.Content items return ContentMappingIter(results, self.backingstore, self.datamodel), count - + def get_uniquevaluesfor(self, property): # XXX: this is very sketchy code @@ -433,6 +524,12 @@ class IndexManager(object): # 'title:foo' match a document whose title contains 'foo' # 'title:"A tale of two datastores"' exact title match # '-this that' match that w/o this + + # limited support for wildcard searches + qp = _xapian.QueryParser + + flags = (qp.FLAG_LOVEHATE) + ri = self.read_index start = 0 end = len(query) @@ -460,11 +557,35 @@ class IndexManager(object): #XXX: strip quotes or not here #word = query[orig+1:qm.end(1)-1] word = query[orig:qm.end(1)] + # this is a phase modify the flags + flags |= qp.FLAG_PHRASE start = qm.end(1) + 1 if field: queries.append(ri.query_field(field, word)) else: - queries.append(ri.query_parse(word)) + if word.endswith("*"): + flags |= qp.FLAG_WILDCARD + q = self._query_parse(word, flags) + + queries.append(q) q = ri.query_composite(ri.OP_AND, queries) return q + + + + def _query_parse(self, word, flags=0, op=None): + # while newer secore do pass flags it doesn't allow control + # over them at the API level. We override here to support + # adding wildcard searching + ri = self.read_index + if op is None: op = ri.OP_AND + qp = ri._prepare_queryparser(None, None, op) + try: + return qp.parse_query(word, flags) + except _xapian.QueryParserError, e: + # If we got a parse error, retry without boolean operators (since + # these are the usual cause of the parse error). + return qp.parse_query(string, 0) + + diff --git a/tests/mountpoints.txt b/tests/mountpoints.txt index 695e7d2..9210906 100644 --- a/tests/mountpoints.txt +++ b/tests/mountpoints.txt @@ -147,12 +147,13 @@ primary store. >>> props['mountpoint'] = mountpoint >>> fn = ds.get_filename(uid) +>>> del props['uid'] >>> copyuid = ds.create(props, fn) >>> ds.complete_indexing() ->>> result, count = ds.find(dict(fulltext="four")) +>>> result, count = ds.find(dict(query="four")) >>> assert count == 2 We also need to test that we can copy from a normal store to an @@ -194,7 +195,7 @@ We also need to be sure that delete commands work on inplace mounts. We will delete the object from the datastore and then verify that the file is missing. ->>> ds.delete(pen_copy) +>>> ds.delete(pen_copy, mountpoint=mp3) >>> ds.complete_indexing() >>> os.path.exists('/tmp/store3/one.txt') diff --git a/tests/runalltests.py b/tests/runalltests.py index 8fee87e..a152156 100644 --- a/tests/runalltests.py +++ b/tests/runalltests.py @@ -27,7 +27,7 @@ doctests = [ resource_filename(__name__, "mountpoints.txt"), resource_filename(__name__, "properties.txt"), resource_filename(__name__, "dateranges.txt"), - + resource_filename(__name__, "simple_versions.txt"), ] doctest_options = doctest.ELLIPSIS diff --git a/tests/simple_versions.txt b/tests/simple_versions.txt new file mode 100644 index 0000000..d35882f --- /dev/null +++ b/tests/simple_versions.txt @@ -0,0 +1,105 @@ +The Datastore supports versioning of content objects and +metadata. Inorder to support this we introduce new API calls and a +higher level set of semantic operations around versioning. + +Let's create a simple datastore and add some content. + +>>> import os +>>> assert os.system('rm -rf /tmp/store1/') == 0 + +>>> from olpc.datastore import DataStore +>>> from olpc.datastore import backingstore +>>> from olpc.datastore import hg_backingstore +>>> from testutils import * + +>>> ds = DataStore() +>>> ds.registerBackend(hg_backingstore.HgBackingStore) +>>> mp1 = ds.mount("hg:/tmp/store1", dict(title="Primary Storage")) + + +The checkin operation will create new content or update existing +content and increment the version all in a single operation. It takes +the arguments of a properties dictionary and an optional filename of +the content to store with this revision. + +>>> fn = tmpData("Part 1 -- it begins") +>>> uid, vid = ds.checkin({'title' : 'A day in the life'}, fn) + +This operation returns the uid of the object and its current version +id. + +From the datastore we can now verify that this object exists and is +indexed. To ensure this for testing we first allow the indexer to +complete all its pending operations + +>>> ds.complete_indexing() + +>>> results, count = ds.find("A day") +>>> assert count == 1 +>>> assert results[0]['uid'] == uid + +We can also search on its content directly. + +>>> results, count = ds.find("Part 1") +>>> assert count == 1 +>>> assert results[0]['uid'] == uid + + +To get a copy of this file out that we can manipulate we can use the +checkout command. By default checkout will check out the HEAD revsion +(the most recent) of a document. It returns the properties dictionary +and the filename of the checkout which is ours to manipulate. + +>>> props, fn = ds.checkout(uid, dir='/tmp') + +>>> assert props['title'] == "A day in the life" +>>> assert props['vid'] == str(vid) + +>>> contents = open(fn, 'r').read() +>>> assert contents.startswith("Part 1") + +Lets make a revision to this content. + +>>> fn2 = tmpData("Part Two -- the second helping") + +We are going to check in the new file using the props dict of the last +call after making our modifications and supplying our new file. + + +>>> props['title'] = "A Night in the Life" +>>> uid, vid = ds.checkin(props, fn2) +>>> ds.complete_indexing() + + +Old versions of content are still available. + +>>> r, c = ds.find("begins") +>>> assert c == 1 + +Verify that the HEAD revision of the content is searchable by default. + +>>> r, c = ds.find("second") +>>> assert c == 1 +>>> assert r[0]['uid'] == uid + +Lets check out the head version of this document now. + +>>> props, rev2 = ds.checkout(uid, dir='/tmp') + +Check that the id and vid are correct. + +>>> assert props['uid'] == uid +>>> assert props['vid'] == vid + +Verify the contents of the file is as expected. + +>>> contents = open(rev2, 'r').read() +>>> assert contents.startswith("Part Two") + + +>>> ds.stop(); del ds + + + + + diff --git a/tests/test_hgrepo.py b/tests/test_hgrepo.py new file mode 100644 index 0000000..0a05732 --- /dev/null +++ b/tests/test_hgrepo.py @@ -0,0 +1,115 @@ +import unittest +from testutils import blit, tmpData + +from olpc.datastore import hg_backingstore +import os + +DEFAULT_STORE = '/tmp/hgtest' +DATA_1 = '/tmp/data1' +DATA_2 = '/tmp/data2' + +class Test(unittest.TestCase): + def setUp(self): + os.system("rm -rf %s" % DEFAULT_STORE) + os.system("rm -rf %s" % DATA_1) + os.system("rm -rf %s" % DATA_2) + + os.makedirs(DATA_1) + os.makedirs(DATA_2) + + def tearDown(self): + os.system("rm -rf %s" % DEFAULT_STORE) + os.system("rm -rf %s" % DATA_1) + os.system("rm -rf %s" % DATA_2) + + def test_hgrepo(self): + repo = hg_backingstore.FileRepo(DEFAULT_STORE) + + # create a test file in DATA_1 + TEXT_1= "this is a test" + fn1 = blit(TEXT_1, os.path.join(DATA_1, "s1")) + # and the version we will use later in DATA_2 + # we do this to test tracking from different source dirs + TEXT_2 = "another test" + fn2 = blit(TEXT_2, os.path.join(DATA_2, "s2")) + + # this should add foo to the repo with TEXT_1 data + repo.put("foo", fn1, text="initial") + + # now commit fn2 to the same path, will create another + # revision + # of the existing data + repo.put("foo", fn2, text="updated") + + # now verify access to both revisions and their data + # we check things out into DATA_1 + co1 = os.path.join(DATA_1, "co1") + co2 = os.path.join(DATA_1, "co2") + + repo.dump("foo", 0, co1, '', '') + repo.dump("foo", 1, co2, '', '') + + assert open(co1, 'r').read() == TEXT_1 + assert open(co2, 'r').read() == TEXT_2 + + def test_hgbs(self): + bs = hg_backingstore.HgBackingStore("hg:%s" % DEFAULT_STORE) + bs.initialize_and_load() + bs.create_descriptor() + desc = bs.descriptor() + assert 'id' in desc + assert 'uri' in desc + assert 'title' in desc + assert desc['title'] is not None + + d = """This is a test""" + d2 = "Different" + + uid, rev = bs.checkin(dict(title="A", filename="a.txt"), tmpData(d)) + + bs.complete_indexing() + + props, fn = bs.checkout(uid) + + assert props.get('title') == "A" + got = open(fn, 'r').read() + assert got == d + + props['title'] = "B" + uid, rev = bs.checkin(props, tmpData(d2)) + + bs.complete_indexing() + + props, fn = bs.checkout(uid) + assert props.get('title') == "B" + got = open(fn, 'r').read() + assert got == d2 + + # go back and check out the first version + props, fn = bs.checkout(uid, 1) + assert props.get('title') == "A" + got = open(fn, 'r').read() + assert got == d + + bs.delete(uid, props['vid']) + bs.complete_indexing() + + # There is no more revision 2 + self.failUnlessRaises(KeyError, bs.get, uid, 1) + +## props, fn = bs.checkout(uid) + +## import pdb;pdb.set_trace() +## assert props.get('title') == "A" +## got = open(fn, 'r').read() +## assert got == d + + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(Test)) + return suite + +if __name__ == "__main__": + unittest.main() + diff --git a/tests/test_model.py b/tests/test_model.py index 6d171c1..5de0b45 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -6,6 +6,8 @@ from olpc.datastore import model, backingstore import datetime import os +import logging +logging.basicConfig() DEFAULT_STORE = '/tmp/test_ds' diff --git a/tests/testutils.py b/tests/testutils.py index fc667db..2f3e7ff 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -8,3 +8,21 @@ def tmpData(data): os.close(fd) return fn +def blit(data, path=None): + if not path: return tmpData(data) + fp = open(path, 'w') + fp.write(data) + fp.close() + return path + + +# Search result set handlers +def expect(r, count=None): + if count: assert r[1] == count + return list(r[0]) +def expect_single(r): + assert r[1] == 1 + return r[0].next() +def expect_none(r): + assert r[1] == 0 + assert list(r[0]) == [] diff --git a/tests/xapianindex.txt b/tests/xapianindex.txt index 22aa05d..b28ac90 100644 --- a/tests/xapianindex.txt +++ b/tests/xapianindex.txt @@ -81,6 +81,8 @@ But an OR query for missing values still return nothing. ... 'audio/ogg']))) +Partial search... +>>> assert expect_single(im.search(r'''pee*''')).id == uid |