diff options
author | Benjamin Saller <bcsaller@objectrealms.net> | 2007-08-16 07:32:18 (GMT) |
---|---|---|
committer | Benjamin Saller <bcsaller@objectrealms.net> | 2007-08-16 07:32:18 (GMT) |
commit | 993b69294686830639299a62b844e2e9319b54b8 (patch) | |
tree | 9c9200600eab03e813fd5f8632b73dbf1263cc81 | |
parent | 1c11e80f7d7ad68d62a29452fa71571ed838c6e9 (diff) |
wip
-rw-r--r-- | src/olpc/datastore/backingstore.py | 37 | ||||
-rw-r--r-- | src/olpc/datastore/datastore.py | 92 | ||||
-rw-r--r-- | src/olpc/datastore/deltastream.py | 45 | ||||
-rw-r--r-- | src/olpc/datastore/hg_backingstore.py | 369 | ||||
-rw-r--r-- | src/olpc/datastore/model.py | 26 | ||||
-rw-r--r-- | src/olpc/datastore/sxattr.py | 54 | ||||
-rw-r--r-- | src/olpc/datastore/xapianindex.py | 198 | ||||
-rw-r--r-- | tests/mountpoints.txt | 5 | ||||
-rw-r--r-- | tests/simple_versions.txt | 20 | ||||
-rw-r--r-- | tests/test_hgrepo.py | 114 | ||||
-rw-r--r-- | tests/testutils.py | 8 | ||||
-rw-r--r-- | tests/xapianindex.txt | 2 |
12 files changed, 828 insertions, 142 deletions
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py index 724e7cc..8c93098 100644 --- a/src/olpc/datastore/backingstore.py +++ b/src/olpc/datastore/backingstore.py @@ -27,6 +27,18 @@ from olpc.datastore import utils # changing this pattern impacts _targetFile filename_attempt_pattern = re.compile('\(\d+\)$') +# capabilities +# list of strings on a backing store that indicate it provides an +# implementation of some feature another layer can expect +# "file" indicates that the store can return traditional filelike +# objects and is the baseline capability expected of most other layers +# "inplace" indicates that the backingstore will update/index the +# files as they reside on the store. Files will not be moved. +# "versions" indicates that the store will keep revisions of content +# internally. This has implications at the search layer which may +# perform different operations on stores that exhibit this capability + + class BackingStore(object): """Backing stores manage stable storage. We abstract out the management of file/blob storage through this class, as well as the @@ -39,6 +51,8 @@ class BackingStore(object): olpc.datastore.model are provided. """ + capabilities = () + def __init__(self, uri, **kwargs): """The kwargs are used to configure the backend so it can provide its interface. See specific backends for details @@ -127,6 +141,8 @@ class FileBackingStore(BackingStore): STORE_NAME = "store" INDEX_NAME = "index" DESCRIPTOR_NAME = "metainfo" + + capabilities = ("file") def __init__(self, uri, **kwargs): """ FileSystemStore(path=<root of managed storage>) @@ -446,7 +462,8 @@ class InplaceFileBackingStore(FileBackingStore): """ STORE_NAME = ".olpc.store" - + capabilities = ("file", "inplace") + def __init__(self, uri, **kwargs): # remove the 'inplace:' scheme uri = uri[len('inplace:'):] @@ -495,7 +512,8 @@ class InplaceFileBackingStore(FileBackingStore): source = os.path.join(dirpath, fn) relative = source[len(self.uri)+1:] - + source = os.path.abspath(source) + result, count = self.indexmanager.search(dict(filename=relative)) mime_type = gnomevfs.get_mime_type(source) stat = os.stat(source) @@ -517,7 +535,7 @@ class InplaceFileBackingStore(FileBackingStore): # happen) content = result.next() uid = content.id - saved_mtime = content.get_property('mtime') + saved_mtime = content.get_property('mtime', None) if mtime != saved_mtime: self.update(uid, metadata, source) self.indexmanager.flush() @@ -528,13 +546,6 @@ class InplaceFileBackingStore(FileBackingStore): except KeyError: return None return os.path.join(self.uri, content.get_property('filename', uid)) -## def _targetFile(self, uid, target=None, ext=None, env=None): -## # in this case the file should really be there unless it was -## # deleted in place or something which we typically isn't -## # allowed -## # XXX: catch this case and remove the index -## targetpath = self._translatePath(uid) -## return open(targetpath, 'rw') # File Management API def create(self, props, filelike): @@ -542,17 +553,13 @@ class InplaceFileBackingStore(FileBackingStore): # don't touch it proposed_name = None if filelike: - if isinstance(filelike, basestring): - # lets treat it as a filename - filelike = open(filelike, "r") - filelike.seek(0) # usually with USB drives and the like the file we are # indexing is already on it, however in the case of moving # files to these devices we need to detect this case and # place the file proposed_name = props.get('filename', None) if not proposed_name: - proposed_name = os.path.split(filelike.name)[1] + proposed_name = os.path.split(filelike)[1] # record the name before qualifying it to the store props['filename'] = proposed_name proposed_name = os.path.join(self.uri, proposed_name) diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py index f3a465f..1fde4c7 100644 --- a/src/olpc/datastore/datastore.py +++ b/src/olpc/datastore/datastore.py @@ -193,27 +193,80 @@ class DataStore(dbus.service.Object): This method returns the uid and version id tuple. """ mp = self._resolveMountpoint(props) - uid = props.get('uid') - if uid: - # this is an update operation - pass + if "versions" not in mp.capabilities: + uid = props.get('uid') + vid = "1" # we don't care about vid on + # non-versioning stores + if uid: + # this is an update operation + # and uid refers to the single object + # so an update is ok + mp.update(uid, props, filelike) + else: + # this is a create operation + uid = mp.create(props, filelike) else: - # this is a create operation - pass + # use the unified checkin on the backingstore + uid, vid = mp.checkin(props, filelike) + return uid, vid @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='ss', + in_signature='sss', out_signature='a{sv}s') - def checkout(self, uid, vid=None): + def checkout(self, uid, vid=None, mountpoint=None): """Check out a revision of a document. Returns the properties of that version and a filename with the contents of that - version. + version. Generally calls to this should have the mountpoint + supplied as their may be different repositories with unmerged + histories of the same object. """ - pass - - + ## XXX: review this when repo merge code is in place + mp = self._resolveMountpoint(mountpoint) + if "versions" not in mp.capabilities: + content = mp.get(uid) + props = content.properties + props['mountpoint'] = content.backingstore.id + filename = content.filename + return props, filename + else: + return mp.checkout(uid, vid) + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='ssss', + out_signature='a{sv}s') + def copy(self, uid, vid=None, mountpoint=None, target_mountpoint=None): + # attempt to copy an object from one mount to another + # if the uid/vid pair is not found a KeyError is raised + # if the mount points don't exist a KeyError is raised + # if both stores support versioning then the whole revision + # history is copied, if not this behaves as a checkout on the + # store + mp = self._resolveMountpoint(mountpoint) + mp2 = self._resolveMountpoint(target_mountpoint) + if not mp:raise KeyError("No mount %s" % mountpoint) + if not mp2:raise KeyError("No mount %s" % target_mountpoint) + + vs = "versions" in mp.capabilities + vt = "versions" in mp2.capabilities + content = mp.get(uid) + props = content.properties + + if not vs or not vt: + del props['uid'] + filename = content.filename + uid = mp2.create(props, filename) + return uid, '1' + else: + # in this case we can copy the whole version history + mp2.raw_copy(mp.raw_sources()) + # this creates an extra checkout, but as long as its not + # an inplace store it should be cleaner on index + mp2.update(props, content.filename) + + + # OLD API @dbus.service.method(DS_DBUS_INTERFACE, in_signature='a{sv}s', out_signature='s') @@ -358,7 +411,9 @@ class DataStore(dbus.service.Object): for r in results: props = {} props.update(r.properties) - + + # on versioning stores uid will be different + # than r.id but must be set if 'uid' not in props: props['uid'] = r.id @@ -379,8 +434,8 @@ class DataStore(dbus.service.Object): return (d, len(results)) - def get(self, uid): - mp = self._resolveMountpoint() + def get(self, uid, mountpoint=None): + mp = self._resolveMountpoint(mountpoint) c = None try: c = mp.get(uid) @@ -452,15 +507,16 @@ class DataStore(dbus.service.Object): def Updated(self, uid): pass @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', + in_signature='ss', out_signature='') - def delete(self, uid): - content = self.get(uid) + def delete(self, uid, mountpoint=None): + content = self.get(uid, mountpoint) if content: content.backingstore.delete(uid) self.Deleted(uid) logger.debug("deleted %s" % uid) + @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") def Deleted(self, uid): pass diff --git a/src/olpc/datastore/deltastream.py b/src/olpc/datastore/deltastream.py deleted file mode 100644 index f880a07..0000000 --- a/src/olpc/datastore/deltastream.py +++ /dev/null @@ -1,45 +0,0 @@ -""" -deltastream -~~~~~~~~~~~~~~~~~~~~ -A forward or backward stream of delta information used to manage file -versions efficiently - -""" - -__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>' -__docformat__ = 'restructuredtext' -__copyright__ = 'Copyright ObjectRealms, LLC, 2007' -__license__ = 'The GNU Public License V2+' - - -import bsdiff -FULL = 1 -PATCH = 2 - -class DeltaStream(object): - """Record and Reconstruct objects from a forward diff chain. When diff - size/distance from the original is over a threshold we record a - new version in its entirety - """ - - def _record(self, old_fn, new_fn): - od = open(old_fn, 'r').read() - nd = open(new_fn, 'r').read() - - #XXX: This needs to be done more memory efficiently - patch = bsdiff.Patch(od, nd) - # XXX: again, memory inefficient - if len(str(patch)) < (len(nd) / 2.0): - # The patch is larger than some threshold, we want to - # record a new full version rather than a patch - return FULL, nd - else: - return PATCH, patch - - def record(self, old_fn, new_fn): - mode, data = self._record(old_fn, new_fn) - if mode is FULL: - pass - elif mode is PATCH: - pass - diff --git a/src/olpc/datastore/hg_backingstore.py b/src/olpc/datastore/hg_backingstore.py new file mode 100644 index 0000000..588e3f6 --- /dev/null +++ b/src/olpc/datastore/hg_backingstore.py @@ -0,0 +1,369 @@ +from olpc.datastore.backingstore import FileBackingStore +from olpc.datastore.sxattr import Xattr +from olpc.datastore.utils import create_uid +from olpc.datastore.bin_copy import bin_copy + +from mercurial import repo, filelog, transaction, util, revlog +import os, sys, tempfile + +# Xattr attribute namespace +NAMESPACE = "datastore" + + + +class localizedFilelog(filelog.filelog): + def __init__(self, opener, path, cwd=None): + self._fullpath = os.path.realpath(cwd) + revlog.revlog.__init__(self, opener, + self.encodedir(path + ".i")) + @property + def rindexfile(self): return os.path.join(self._fullpath, self.indexfile) + + @property + def rdatafile(self): return os.path.join(self._fullpath, self.datafile) + +class FileRepo(repo.repository): + """A very simple file repository that functions without the need + for global changesets or a working copy. + """ + + def __init__(self, path): + self.basepath = path + self.root = os.path.realpath(self.basepath) + self.repo = "data" + self.datadir = os.path.join(self.basepath, ".ds") + if not os.path.exists(self.basepath): + os.makedirs(self.basepath) + #os.chdir(self.basepath) + self.wopener = util.opener(self.root) + + def file(self, path): + eopen = util.encodedopener(util.opener('.ds'), util.encodefilename) + fl = localizedFilelog(eopen, path, cwd=self.datadir) + return fl + + + def __contains__(self, path): + """Is this path already managed by the repo?""" + f = self.file(path) + p = f.rindexfile + return os.path.exists(p) + + def raw_copy(self, filenames): + # create localized repository versions of the raw data from + # another repo + # this doesn't do indexing or anything like that + for fn in filenames: + srcdir, n = os.path.split(fn) + target = self._rjoin(n) + bin_copy(fn, target) + + + def raw_sources(self): + # return list of filenames which must be copied + return [d for d in self._rdata() if d] + + def _rjoin(self, path): + # repository path + return os.path.join(self.basepath, ".ds", path) + + def _rdata(self, path): + """return the index and datafile names in the repository""" + f = self.file(path) + base = os.path.join(self.basepath, ".ds") + i = os.path.join(base, f.rindexfile) + d = os.path.join(base, f.rdatafile) + return (i and i or None, d and d or None) + + def put(self, path, source, parent=None, text=None, meta=None): + """Create a new revision of the content indicated by + 'path'. 'source' is the filename containing the data we wish + to commit. parent when present is the parent revision this + data comes from. When parent is not provided we first look at + the source file for the xattr 'user.datastore.revision', if + that is present we assume it is the parent revision of the + element in path. When absent we assume that this is a delta to + the tip. + + + @return rev, parent, parent is tip, changeset + - rev is this files new revision number + - parent is the id of the revision used as the parent + - parent_is_tip indicates if the parent was the most recent + head + - changeset is the random uid of this transaction used in + merges + """ + + # we don't have a real working copy model. The source file may + # exist in some path or location well outside the repo (and + # most likely does) + f = self.file(path) + data = open(source, 'r').read() + + tip = f.tip() + if not parent: + x = Xattr(source, NAMESPACE) + # attempt to resolve the revision number from the property + parent = x.get('revision') + if parent: + parent = int(parent) # from unicode + else: + # there wasn't an attribute on the file + # this can happen if the file was created by the + # client or + # the filesystem didn't support xattr + # in this case we assume the revision is the tip + parent = tip + + if isinstance(parent, int): + # its a revision number, resolve it to a node id + parent = f.node(parent) + + if not f.cmp(parent, data): + # they are the same + return + + # assume some default values for missing metadata + # changeset is important here. Files don't properly belong to + # change sets, but this uid is used to discriminate versions + # with identical revision numbers from different sources + changeset = create_uid() + if not meta: meta = {} + meta.setdefault('text', text and text or "automatic commit") + meta['changeset'] = changeset + + # commit the data to the log + t = self.transaction() + rev = f.count() + 1 + f.add(data, meta, t, rev, parent) + t.close() + + return rev, parent, parent == tip, changeset + + def transaction(self): + return transaction.transaction(sys.stderr.write, open, "journal") + + + def tip(self, path): + # return the revision id that is the tip for a given path + l = self.file(path) + return l.rev(l.tip()) + + def revision(self, path, rev): + """Given the path name return the data associated with the raw + revision""" + # we really need a less memory intensive way of doing this + # stream the data to stable media as it processes the delta + if path not in self: raise KeyError("%s is not managed by repo" % path) + l = self.file(path) + if isinstance(rev, int): + n = l.node(rev) + else: + n = rev + return l.read(n) + + def dump(self, path, rev, target): + """Dump the contents of a revision to the filename indicated + by target""" + fp = open(target, "w") + fp.write(self.revision(path, rev)) + fp.close() + # tag the checkout with its current revision + # this is used to aid in parent chaining on commits + x = Xattr(target, NAMESPACE) + x['revision'] = rev + + def remove(self, path): + """Hard remove the whole version history of an object""" + i, d = self._rdata(path) + if i and os.path.exists(i): + os.unlink(i) + # for small files d will not exist as the data is inlined to + # the the index + if d and os.path.exists(d): + os.unlink(d) + + def strip(self, path, rev): + """attempt to remove a given revision from the history of + path""" + f = self.file(path) + f.strip(rev, rev) + + +class HgBackingStore(FileBackingStore): + """This backingstore for the datastore supports versioning by + keeping a barebones Mercurial repository under the hood + """ + capabilities = ("file", "versions") + + def __init__(self, uri, **kwargs): + # the internal handle to the HgRepo + self.repo = None + uri = uri[len('hg:'):] + super(HgBackingStore, self).__init__(uri, **kwargs) + + @staticmethod + def parse(uri): + return uri.startswith("hg:") + + def check(self): + if not os.path.exists(self.uri): return False + if not os.path.exists(self.base): return False + return True + + def initialize(self): + super(FileBackingStore, self).initialize() + self.repo = FileRepo(self.base) + + def load(self): + super(HgBackingStore, self).load() + # connect the repo + if not self.repo: + self.repo = FileRepo(self.base) + + # File Management API + def create(self, props, filelike): + # generate the uid ourselves. we do this so we can track the + # uid and changeset info + # Add it to the index + uid = create_uid() + props['uid'] = uid + props.setdefault('message', 'initial') + uid, rev = self.checkin(props, filelike) + return uid + + def get(self, uid, rev=None, env=None): + # we have a whole version chain, but get contracts to + # return a single entry. In this case we default to 'tip' + if not rev: + rev = self.repo.tip(uid) + results, count = self.indexmanager.get_by_uid_prop(uid, rev) + if count == 0: + raise KeyError(uid) + elif count == 1: + return results.next() + + raise ValueError("Got %d results for 'get' operation on %s" %(count, uid)) + +###################################################################### +# XXX: This whole policy is botched unless the journal grows an +# # interface to display other versions of the main document +# # which it doesn't have. If I do this then we don't see those +# # versions in the journal and have no means to access +# # them. For the time being we just index everything and let +# # date sort them out. +###################################################################### +# # recover the old records for this uid +# # update may rewrite/remove 1-n documents +# # depending on the number of heads and so on +# # this needs to be done under a single transaction +# # actions will be a set of commands passed to +# # xapianindex.enque +# # the loop will process the entire action set at once +# +# # we need to pass head/tip tags from the parent to the child +# # as a result of the update +# # XXX: currently we are only going to index the HEADs and TIP +# # revisions in the repository (and those marked with KEEP). +# # This means that when we update +# # with these tags: +# # we can remove the old version from xapian +# # w/o these tags: +# # it gets a head tag, implying a branch +# # +# # because the keep flag indicates content is needed to be kept +# # locally we have two real choices, either +# # move it forward with each revision +# # keep only the original tagged version +# # and index the new one as well (because it will have the +# # HEAD tag) +########################################################################## + def update(self, uid, props, filelike): + props['uid'] = uid + uid, rev = self.checkin(props, filelike) + return uid + + + def delete(self, uid, rev=None): + # delete the object at 'uid', when no rev is passed tip is + # removed + if rev is None: + rev = self.repo.tip(uid) + c = self.get(uid, rev) + self.indexmanager.delete(c.id) + self.repo.strip(uid, rev) + + def _targetFile(self, uid, target=None, ext=None, env=None): + c = self.indexmanager.get(uid) + rev = int(c.get_property('vid')) + rev -= 1 # adjust for 0 based counting + self.repo.dump(uid, rev, target) + return open(target, 'rw') + + + def checkin(self, props, filelike): + """create or update the content object, creating a new + version""" + uid = props.setdefault('uid', create_uid()) + if filelike: + message = props.setdefault('message', 'initial') + parent = props.pop('parent', None) + rev, parent, isTip, changeset = self.repo.put(uid, filelike, + parent, message, + meta=dict(uid=uid)) + # the create case is pretty simple + # mark everything with defaults + props['changeset'] = changeset + props['vid'] = str(rev) + + self.indexmanager.index(props, filelike) + return uid, rev + + def checkout(self, uid, vid=None, target=None, dir=None): + """checkout the object with this uid at vid (or HEAD if + None). Returns (props, filename)""" + # use the repo to drive the property search + f = self.repo.file(uid) + if vid: + vid = f.node(int(vid) -1) # base 0 counting + else: + vid = f.tip() + rev = f.rev(vid) + # there will only be one thing with the changeset id of this + # 'f' + m = f._readmeta(vid) + changeset = m['changeset'] + objs, count = self.indexmanager.search(dict(changeset=changeset)) + assert count == 1 + obj = objs.next() + + if not target: + target, ext = obj.suggestName() + if not target: + fd, fn = tempfile.mkstemp(dir=dir) + target = fn + + if not target.startswith('/'): + if dir: target = os.path.join(dir, target) + else: os.path.join('/tmp', target) + + self.repo.dump(uid, rev, target) + return obj.properties, target + +if __name__ == "__main__": + import rlcompleter2 + rlcompleter2.setup(verbose=0) + + TESTLOC = "/tmp/fltest" + os.system('rm -rf %s' % TESTLOC) + + c = FileRepo(TESTLOC) + + n = c.blit("foo", "this is a test") + m = c.blit("bar", "another test") + + + o = c.blit("foo", "\nanother line", mode="a") + + c.revisions("foo") diff --git a/src/olpc/datastore/model.py b/src/olpc/datastore/model.py index 14bb4b9..746d727 100644 --- a/src/olpc/datastore/model.py +++ b/src/olpc/datastore/model.py @@ -169,7 +169,7 @@ class Model(object): # Properties we don't automatically include in properties dict -EXCLUDED_PROPERTIES = ['fulltext', ] +EXCLUDED_PROPERTIES = ['fulltext', 'head', 'tip', 'changeset'] class Content(object): """A light weight proxy around Xapian Documents from secore. @@ -227,7 +227,8 @@ class Content(object): # checkout name filename = self.get_property('filename', None) ext = self.get_property('ext', '') - + title = self.get_property('title', '') + if filename: # some backingstores keep the full relative path filename = os.path.split(filename)[1] @@ -242,7 +243,11 @@ class Content(object): ext = mimetypes.guess_extension(mt) # .ksh is a strange ext for plain text if ext and ext == '.ksh': ext = '.txt' + if title: return title, ext if ext: return None, ext + + if title: return title, None + return None, None def get_file(self): @@ -268,9 +273,11 @@ class Content(object): @property def backingstore(self): return self._backingstore - + + # id of this record (unique in index) @property - def id(self): return self._doc.id + def id(self): + return self._doc.id @property def data(self): return self._doc.data @@ -295,6 +302,7 @@ def encode_datetime(value): def decode_datetime(value): # convert a float to a local datetime + if not value: return None return datetime.datetime.fromtimestamp(float(value)).isoformat() def datedec(value, dateformat=DATEFORMAT): @@ -346,8 +354,16 @@ registerPropertyType('date', dateenc, datedec, 'float', {'store' : True, defaultModel = Model().addFields( ('fulltext', 'text'), + # unique to the object through out its history + ('uid', 'string'), # vid is version id - ('vid', 'number'), + # linear version id's don't imply direct lineage, only the relative merge order + ('vid', 'string'), + # unique token per change to help disambiguate for merges + ('changeset', 'string'), + #('head', 'int'), # is this a head revision + #('tip', 'int'), # is this the most recent head + ('checksum', 'string'), ('filename', 'string'), ('ext', 'string'), # its possible we don't store a filename, but diff --git a/src/olpc/datastore/sxattr.py b/src/olpc/datastore/sxattr.py new file mode 100644 index 0000000..78a64f3 --- /dev/null +++ b/src/olpc/datastore/sxattr.py @@ -0,0 +1,54 @@ +""" +simplified xattr +~~~~~~~~~~~~~~~~~~~~ +automatically manage prefixes into the xattr namespace + +""" + +__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>' +__docformat__ = 'restructuredtext' +__copyright__ = 'Copyright ObjectRealms, LLC, 2007' +__license__ = 'The GNU Public License V2+' + + +import xattr + +class Xattr(object): + def __init__(self, filename, prefix, implicitUser=True): + self.filename = filename + self.prefix=[] + if implicitUser: self.prefix.append('user') + self.prefix.append(prefix) + self.ns = '.'.join(self.prefix) + self.keyed = lambda x: '.'.join((self.ns, x)) + + def __getitem__(self, key): + v = xattr.getxattr(self.filename, self.keyed(key)) + return v.decode('utf-8') + + def __setitem__(self, key, value): + if isinstance(value, unicode): + value = value.encode("utf-8") + else: + value = str(value) + xattr.setxattr(self.filename, self.keyed(key), value) + + def __delitem__(self, key): + xattr.removexattr(self.filename, self.keyed(key)) + + def get(self, key, default=None): + try: + return self[key] + except IOError: + return default + + def iterkeys(self): + all = xattr.listxattr(self.filename) + for key in all: + if key.startswith(self.ns): + yield key[len(self.ns) + 1:] + + def keys(self): + return list(self.iterkeys()) + + diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py index d46e0c7..5f29feb 100644 --- a/src/olpc/datastore/xapianindex.py +++ b/src/olpc/datastore/xapianindex.py @@ -15,6 +15,7 @@ __license__ = 'The GNU Public License V2+' from Queue import Queue, Empty import logging +import os import re import sys import time @@ -23,6 +24,7 @@ import threading import warnings import secore +import xapian as _xapian # we need to modify the QueryParser from olpc.datastore import model from olpc.datastore.converter import converter @@ -127,7 +129,12 @@ class IndexManager(object): self.write_index.flush() self.read_index.reopen() - + def enqueSequence(self, commands): + """Takes a sequence of arugments to the normal enque function + and executes them under a single lock/flush cycle + """ + self.queue.put(commands) + def enque(self, uid, vid, doc, operation, filestuff=None): # here we implement the sync/async policy # we want to take create/update operations and @@ -162,52 +169,76 @@ class IndexManager(object): # from the item in the repo as that will become our immutable # copy. Detect those cases and use the internal filename # property or backingstore._translatePath to get at it + versions = self.versions + inplace = self.inplace + while self.indexer_running: # include timeout here to ease shutdown of the thread # if this is a non-issue we can simply allow it to block try: data = self.queue.get(True, 0.025) - uid, vid, doc, operation, filestuff = data + # when we enque a sequence of commands they happen + # under a single write lock pass through the loop and + # the changes become visible at once. + + if not isinstance(data[0], (list, tuple)): + data = (data,) except Empty: - #time.sleep(1.0) continue try: with self._write_lock: - if operation is DELETE: - self.write_index.delete(uid) - logger.info("deleted content %s" % (uid,)) - elif operation is UPDATE: - # Here we handle the conversion of binary - # documents to plain text for indexing. This is - # done in the thread to keep things async and - # latency lower. - # we know that there is filestuff or it - # wouldn't have been queued - filename, mimetype = filestuff - if isinstance(filename, file): - filename = filename.name - fp = converter(filename, mimetype) - if fp: - # read in at a fixed block size, try to - # conserve memory. If this doesn't work - # we can make doc.fields a generator - while True: - chunk = fp.read(2048) - if not chunk: break - doc.fields.append(secore.Field('fulltext', chunk)) + for item in data: + uid, vid, doc, operation, filestuff = item + + if operation is DELETE: + self.write_index.delete(uid) + logger.info("deleted content %s" % (uid,)) + elif operation is UPDATE: + # Here we handle the conversion of binary + # documents to plain text for indexing. This is + # done in the thread to keep things async and + # latency lower. + # we know that there is filestuff or it + # wouldn't have been queued + filename, mimetype = filestuff + if isinstance(filename, file): + filename = filename.name + if filename and not os.path.exists(filename): + # someone removed the file before + # indexing + # or the path is broken + logger.warning("Expected file for" + " indexing at %s. Not" + " Found" % filename) - self.write_index.replace(doc) - logger.info("update file content %s:%s" % (uid, vid)) + fp = converter(filename, mimetype) + if fp: + # read in at a fixed block size, try to + # conserve memory. If this doesn't work + # we can make doc.fields a generator + while True: + chunk = fp.read(2048) + if not chunk: break + doc.fields.append(secore.Field('fulltext', chunk)) + + self.write_index.replace(doc) + if versions and not inplace: + # we know the source file is ours + # to remove + os.unlink(filename) + + logger.info("update file content %s:%s" % (uid, vid)) + else: + logger.warning("""Conversion process failed for document %s %s""" % (uid, filename)) else: - logger.warning("""Conversion process failed for document %s %s""" % (uid, filename)) - else: - logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation)) - + logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation)) + # tell the queue its complete self.queue.task_done() - # we do flush on each record now + # we do flush on each record (or set for enque + # sequences) now self.flush() except: logger.exception("Error in indexer") @@ -267,34 +298,63 @@ class IndexManager(object): return d + @property + def versions(self): + if self.backingstore: + return "versions" in self.backingstore.capabilities + return False + + @property + def inplace(self): + if self.backingstore: + return "inplace" in self.backingstore.capabilities + return False + + def index(self, props, filename=None): """Index the content of an object. Props must contain the following: key -> Property() """ operation = UPDATE + # # Version handling # - # we implicitly create new versions of documents the version - # id should have been set by the higher level system + # are we doing any special handling for versions? uid = props.pop('uid', None) - vid = props.pop('vid', None) + if not uid: uid = create_uid() operation = CREATE - if vid: vid = str(float(vid) + 1.0) - else: vid = "1.0" # Property mapping via model - props = self._mapProperties(props) doc = secore.UnprocessedDocument() add = doc.fields.append - fp = None + + vid = None + if self.versions: + vid = props.get("vid") + if not vid: + self.warn("Didn't supply needed versioning information" + " on a backingstore which performs versioning") + # each versions id is unique when using a versioning store + doc.id = create_uid() + else: + doc.id = uid + + if not vid: vid = '1' + # on non-versioning stores this is redundant but on versioning + # stores it reference to the objects whole timeline + props['uid'] = uid + props['vid'] = vid + + props = self._mapProperties(props) + filestuff = None if filename: # enque async file processing @@ -302,11 +362,11 @@ class IndexManager(object): # and open fp? mimetype = props.get("mime_type") mimetype = mimetype and mimetype.value or 'text/plain' + + filename = os.path.abspath(filename) filestuff = (filename, mimetype) - doc.id = uid - add(secore.Field('vid', vid)) - + # # Property indexing for k, prop in props.iteritems(): @@ -322,13 +382,26 @@ class IndexManager(object): # queue the document for processing self.enque(uid, vid, doc, operation, filestuff) - return uid + return doc.id def get(self, uid): doc = self.read_index.get_document(uid) if not doc: raise KeyError(uid) return model.Content(doc, self.backingstore, self.datamodel) + def get_by_uid_prop(self, uid, rev=None): + # versioning stores fetch objects by uid + # when rev is passed only that particular rev is returne + ri = self.read_index + q = ri.query_field('uid', uid) + if rev: + q = ri.query_filter(q, ri.query_field('vid', str(rev))) + results, count = self._search(q, 0, 1000) + + return results, count + + + def delete(self, uid): # does this need queuing? # the higher level abstractions have to handle interaction @@ -381,8 +454,11 @@ class IndexManager(object): q = ri.query_composite(ri.OP_AND, queries) else: q = self.parse_query(query) - - results = ri.search(q, start_index, end_index) + + return self._search(q, start_index, end_index) + + def _search(self, q, start_index, end_index): + results = self.read_index.search(q, start_index, end_index) count = results.matches_estimated # map the result set to model.Content items @@ -423,6 +499,12 @@ class IndexManager(object): # 'title:foo' match a document whose title contains 'foo' # 'title:"A tale of two datastores"' exact title match # '-this that' match that w/o this + + # limited support for wildcard searches + qp = _xapian.QueryParser + + flags = (qp.FLAG_LOVEHATE) + ri = self.read_index start = 0 end = len(query) @@ -450,11 +532,35 @@ class IndexManager(object): #XXX: strip quotes or not here #word = query[orig+1:qm.end(1)-1] word = query[orig:qm.end(1)] + # this is a phase modify the flags + flags |= qp.FLAG_PHRASE start = qm.end(1) + 1 if field: queries.append(ri.query_field(field, word)) else: - queries.append(ri.query_parse(word)) + if word.endswith("*"): + flags |= qp.FLAG_WILDCARD + q = self._query_parse(word, flags) + + queries.append(q) q = ri.query_composite(ri.OP_AND, queries) return q + + + + def _query_parse(self, word, flags=0, op=None): + # while newer secore do pass flags it doesn't allow control + # over them at the API level. We override here to support + # adding wildcard searching + ri = self.read_index + if op is None: op = ri.OP_AND + qp = ri._prepare_queryparser(None, None, op) + try: + return qp.parse_query(word, flags) + except _xapian.QueryParserError, e: + # If we got a parse error, retry without boolean operators (since + # these are the usual cause of the parse error). + return qp.parse_query(string, 0) + + diff --git a/tests/mountpoints.txt b/tests/mountpoints.txt index 45a359a..0f736be 100644 --- a/tests/mountpoints.txt +++ b/tests/mountpoints.txt @@ -147,12 +147,13 @@ primary store. >>> props['mountpoint'] = mountpoint >>> fn = ds.get_filename(uid) +>>> del props['uid'] >>> copyuid = ds.create(props, fn) >>> ds.complete_indexing() ->>> result, count = ds.find(dict(fulltext="four")) +>>> result, count = ds.find(dict(query="four")) >>> assert count == 2 We also need to test that we can copy from a normal store to an @@ -177,7 +178,7 @@ We also need to be sure that delete commands work on inplace mounts. We will delete the object from the datastore and then verify that the file is missing. ->>> ds.delete(pen_copy) +>>> ds.delete(pen_copy, mountpoint=mp3) >>> ds.complete_indexing() >>> os.path.exists('/tmp/store3/one.txt') diff --git a/tests/simple_versions.txt b/tests/simple_versions.txt index 9bf61d3..9be62bd 100644 --- a/tests/simple_versions.txt +++ b/tests/simple_versions.txt @@ -9,11 +9,12 @@ Let's create a simple datastore and add some content. >>> from olpc.datastore import DataStore >>> from olpc.datastore import backingstore +>>> from olpc.datastore import hg_backingstore >>> from testutils import * >>> ds = DataStore() ->>> ds.registerBackend(backingstore.FileBackingStore) ->>> mp1 = ds.mount("/tmp/store1", dict(title="Primary Storage")) +>>> ds.registerBackend(hg_backingstore.HgBackingStore) +>>> mp1 = ds.mount("hg:/tmp/store1", dict(title="Primary Storage")) The checkin operation will create new content or update existing @@ -48,11 +49,10 @@ To get a copy of this file out that we can manipulate we can use the checkout command. By default checkout will check out the HEAD revsion (the most recent) of a document. It returns the properties dictionary and the filename of the checkout which is ours to manipulate. - >>> props, fn = ds.checkout(uid) >>> assert props['title'] == "A day in the life" ->>> assert props['vid'] == vid +>>> assert props['vid'] == str(vid) >>> contents = open(fn, 'r').read() >>> assert contents.startswith("Part 1") @@ -67,15 +67,14 @@ call after making our modifications and supplying our new file. (note that we changed the case of 'life' here) >>> props['title'] = "A day in the Life" ->>> ds.checkin(props, fn2) +>>> uid, vid = ds.checkin(props, fn2) >>> ds.complete_indexing() -Verify that the contents of the old version isn't returned in the -search. By default old versions are not included. +Old versions of content are still available. ->>> r, c = ds.find("begins")) ->>> assert c == 0 +>>> r, c = ds.find("begins") +>>> assert c == 1 Verify that the HEAD revision of the content is searchable by default. @@ -89,7 +88,7 @@ Lets check out the head version of this document now. Check that the id is the same and the version id isn't. ->>> assert props['id'] == uid +>>> assert props['uid'] == uid >>> assert props['vid'] != vid Verify the contents of the file is as expected. @@ -98,7 +97,6 @@ Verify the contents of the file is as expected. >>> assert contents.startswith("Part Two") - >>> ds.stop(); del ds diff --git a/tests/test_hgrepo.py b/tests/test_hgrepo.py new file mode 100644 index 0000000..c67c506 --- /dev/null +++ b/tests/test_hgrepo.py @@ -0,0 +1,114 @@ +import unittest +from testutils import blit, tmpData + +from olpc.datastore import hg_backingstore +import os + +DEFAULT_STORE = '/tmp/hgtest' +DATA_1 = '/tmp/data1' +DATA_2 = '/tmp/data2' + +class Test(unittest.TestCase): + def setUp(self): + os.system("rm -rf %s" % DEFAULT_STORE) + os.system("rm -rf %s" % DATA_1) + os.system("rm -rf %s" % DATA_2) + + os.makedirs(DATA_1) + os.makedirs(DATA_2) + + def tearDown(self): + os.system("rm -rf %s" % DEFAULT_STORE) + os.system("rm -rf %s" % DATA_1) + os.system("rm -rf %s" % DATA_2) + + def test_hgrepo(self): + repo = hg_backingstore.FileRepo(DEFAULT_STORE) + + # create a test file in DATA_1 + TEXT_1= "this is a test" + fn1 = blit(TEXT_1, os.path.join(DATA_1, "s1")) + # and the version we will use later in DATA_2 + # we do this to test tracking from different source dirs + TEXT_2 = "another test" + fn2 = blit(TEXT_2, os.path.join(DATA_2, "s2")) + + # this should add foo to the repo with TEXT_1 data + repo.put("foo", fn1, text="initial") + + # now commit fn2 to the same path, will create another + # revision + # of the existing data + repo.put("foo", fn2, text="updated") + + # now verify access to both revisions and their data + # we check things out into DATA_1 + co1 = os.path.join(DATA_1, "co1") + co2 = os.path.join(DATA_1, "co2") + + repo.dump("foo", 0, co1) + repo.dump("foo", 1, co2) + + assert open(co1, 'r').read() == TEXT_1 + assert open(co2, 'r').read() == TEXT_2 + + def test_hgbs(self): + bs = hg_backingstore.HgBackingStore("hg:%s" % DEFAULT_STORE) + bs.initialize_and_load() + bs.create_descriptor() + desc = bs.descriptor() + assert 'id' in desc + assert 'uri' in desc + assert 'title' in desc + assert desc['title'] is not None + + d = """This is a test""" + d2 = "Different" + + uid, rev = bs.checkin(dict(title="A", filename="a.txt"), tmpData(d)) + + bs.complete_indexing() + + props, fn = bs.checkout(uid) + + assert props.get('title') == "A" + got = open(fn, 'r').read() + assert got == d + + uid, rev = bs.checkin(dict(uid=uid, title="B"), tmpData(d2)) + + bs.complete_indexing() + + props, fn = bs.checkout(uid) + assert props.get('title') == "B" + got = open(fn, 'r').read() + assert got == d2 + + # go back and check out the first version + props, fn = bs.checkout(uid, 1) + assert props.get('title') == "A" + got = open(fn, 'r').read() + assert got == d + + bs.delete(uid, props['vid']) + bs.complete_indexing() + + # There is no more revision 2 + self.failUnlessRaises(KeyError, bs.get, uid, 1) + +## props, fn = bs.checkout(uid) + +## import pdb;pdb.set_trace() +## assert props.get('title') == "A" +## got = open(fn, 'r').read() +## assert got == d + + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(Test)) + return suite + +if __name__ == "__main__": + unittest.main() + diff --git a/tests/testutils.py b/tests/testutils.py index e81b22c..2f3e7ff 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -8,6 +8,14 @@ def tmpData(data): os.close(fd) return fn +def blit(data, path=None): + if not path: return tmpData(data) + fp = open(path, 'w') + fp.write(data) + fp.close() + return path + + # Search result set handlers def expect(r, count=None): if count: assert r[1] == count diff --git a/tests/xapianindex.txt b/tests/xapianindex.txt index 22aa05d..b28ac90 100644 --- a/tests/xapianindex.txt +++ b/tests/xapianindex.txt @@ -81,6 +81,8 @@ But an OR query for missing values still return nothing. ... 'audio/ogg']))) +Partial search... +>>> assert expect_single(im.search(r'''pee*''')).id == uid |