Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBenjamin Saller <bcsaller@objectrealms.net>2007-08-16 07:32:18 (GMT)
committer Benjamin Saller <bcsaller@objectrealms.net>2007-08-16 07:32:18 (GMT)
commit993b69294686830639299a62b844e2e9319b54b8 (patch)
tree9c9200600eab03e813fd5f8632b73dbf1263cc81
parent1c11e80f7d7ad68d62a29452fa71571ed838c6e9 (diff)
wip
-rw-r--r--src/olpc/datastore/backingstore.py37
-rw-r--r--src/olpc/datastore/datastore.py92
-rw-r--r--src/olpc/datastore/deltastream.py45
-rw-r--r--src/olpc/datastore/hg_backingstore.py369
-rw-r--r--src/olpc/datastore/model.py26
-rw-r--r--src/olpc/datastore/sxattr.py54
-rw-r--r--src/olpc/datastore/xapianindex.py198
-rw-r--r--tests/mountpoints.txt5
-rw-r--r--tests/simple_versions.txt20
-rw-r--r--tests/test_hgrepo.py114
-rw-r--r--tests/testutils.py8
-rw-r--r--tests/xapianindex.txt2
12 files changed, 828 insertions, 142 deletions
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py
index 724e7cc..8c93098 100644
--- a/src/olpc/datastore/backingstore.py
+++ b/src/olpc/datastore/backingstore.py
@@ -27,6 +27,18 @@ from olpc.datastore import utils
# changing this pattern impacts _targetFile
filename_attempt_pattern = re.compile('\(\d+\)$')
+# capabilities
+# list of strings on a backing store that indicate it provides an
+# implementation of some feature another layer can expect
+# "file" indicates that the store can return traditional filelike
+# objects and is the baseline capability expected of most other layers
+# "inplace" indicates that the backingstore will update/index the
+# files as they reside on the store. Files will not be moved.
+# "versions" indicates that the store will keep revisions of content
+# internally. This has implications at the search layer which may
+# perform different operations on stores that exhibit this capability
+
+
class BackingStore(object):
"""Backing stores manage stable storage. We abstract out the
management of file/blob storage through this class, as well as the
@@ -39,6 +51,8 @@ class BackingStore(object):
olpc.datastore.model are provided.
"""
+ capabilities = ()
+
def __init__(self, uri, **kwargs):
"""The kwargs are used to configure the backend so it can
provide its interface. See specific backends for details
@@ -127,6 +141,8 @@ class FileBackingStore(BackingStore):
STORE_NAME = "store"
INDEX_NAME = "index"
DESCRIPTOR_NAME = "metainfo"
+
+ capabilities = ("file")
def __init__(self, uri, **kwargs):
""" FileSystemStore(path=<root of managed storage>)
@@ -446,7 +462,8 @@ class InplaceFileBackingStore(FileBackingStore):
"""
STORE_NAME = ".olpc.store"
-
+ capabilities = ("file", "inplace")
+
def __init__(self, uri, **kwargs):
# remove the 'inplace:' scheme
uri = uri[len('inplace:'):]
@@ -495,7 +512,8 @@ class InplaceFileBackingStore(FileBackingStore):
source = os.path.join(dirpath, fn)
relative = source[len(self.uri)+1:]
-
+ source = os.path.abspath(source)
+
result, count = self.indexmanager.search(dict(filename=relative))
mime_type = gnomevfs.get_mime_type(source)
stat = os.stat(source)
@@ -517,7 +535,7 @@ class InplaceFileBackingStore(FileBackingStore):
# happen)
content = result.next()
uid = content.id
- saved_mtime = content.get_property('mtime')
+ saved_mtime = content.get_property('mtime', None)
if mtime != saved_mtime:
self.update(uid, metadata, source)
self.indexmanager.flush()
@@ -528,13 +546,6 @@ class InplaceFileBackingStore(FileBackingStore):
except KeyError: return None
return os.path.join(self.uri, content.get_property('filename', uid))
-## def _targetFile(self, uid, target=None, ext=None, env=None):
-## # in this case the file should really be there unless it was
-## # deleted in place or something which we typically isn't
-## # allowed
-## # XXX: catch this case and remove the index
-## targetpath = self._translatePath(uid)
-## return open(targetpath, 'rw')
# File Management API
def create(self, props, filelike):
@@ -542,17 +553,13 @@ class InplaceFileBackingStore(FileBackingStore):
# don't touch it
proposed_name = None
if filelike:
- if isinstance(filelike, basestring):
- # lets treat it as a filename
- filelike = open(filelike, "r")
- filelike.seek(0)
# usually with USB drives and the like the file we are
# indexing is already on it, however in the case of moving
# files to these devices we need to detect this case and
# place the file
proposed_name = props.get('filename', None)
if not proposed_name:
- proposed_name = os.path.split(filelike.name)[1]
+ proposed_name = os.path.split(filelike)[1]
# record the name before qualifying it to the store
props['filename'] = proposed_name
proposed_name = os.path.join(self.uri, proposed_name)
diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py
index f3a465f..1fde4c7 100644
--- a/src/olpc/datastore/datastore.py
+++ b/src/olpc/datastore/datastore.py
@@ -193,27 +193,80 @@ class DataStore(dbus.service.Object):
This method returns the uid and version id tuple.
"""
mp = self._resolveMountpoint(props)
- uid = props.get('uid')
- if uid:
- # this is an update operation
- pass
+ if "versions" not in mp.capabilities:
+ uid = props.get('uid')
+ vid = "1" # we don't care about vid on
+ # non-versioning stores
+ if uid:
+ # this is an update operation
+ # and uid refers to the single object
+ # so an update is ok
+ mp.update(uid, props, filelike)
+ else:
+ # this is a create operation
+ uid = mp.create(props, filelike)
else:
- # this is a create operation
- pass
+ # use the unified checkin on the backingstore
+ uid, vid = mp.checkin(props, filelike)
+
return uid, vid
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='ss',
+ in_signature='sss',
out_signature='a{sv}s')
- def checkout(self, uid, vid=None):
+ def checkout(self, uid, vid=None, mountpoint=None):
"""Check out a revision of a document. Returns the properties
of that version and a filename with the contents of that
- version.
+ version. Generally calls to this should have the mountpoint
+ supplied as their may be different repositories with unmerged
+ histories of the same object.
"""
- pass
-
-
+ ## XXX: review this when repo merge code is in place
+ mp = self._resolveMountpoint(mountpoint)
+ if "versions" not in mp.capabilities:
+ content = mp.get(uid)
+ props = content.properties
+ props['mountpoint'] = content.backingstore.id
+ filename = content.filename
+ return props, filename
+ else:
+ return mp.checkout(uid, vid)
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='ssss',
+ out_signature='a{sv}s')
+ def copy(self, uid, vid=None, mountpoint=None, target_mountpoint=None):
+ # attempt to copy an object from one mount to another
+ # if the uid/vid pair is not found a KeyError is raised
+ # if the mount points don't exist a KeyError is raised
+ # if both stores support versioning then the whole revision
+ # history is copied, if not this behaves as a checkout on the
+ # store
+ mp = self._resolveMountpoint(mountpoint)
+ mp2 = self._resolveMountpoint(target_mountpoint)
+ if not mp:raise KeyError("No mount %s" % mountpoint)
+ if not mp2:raise KeyError("No mount %s" % target_mountpoint)
+
+ vs = "versions" in mp.capabilities
+ vt = "versions" in mp2.capabilities
+ content = mp.get(uid)
+ props = content.properties
+
+ if not vs or not vt:
+ del props['uid']
+ filename = content.filename
+ uid = mp2.create(props, filename)
+ return uid, '1'
+ else:
+ # in this case we can copy the whole version history
+ mp2.raw_copy(mp.raw_sources())
+ # this creates an extra checkout, but as long as its not
+ # an inplace store it should be cleaner on index
+ mp2.update(props, content.filename)
+
+
+ # OLD API
@dbus.service.method(DS_DBUS_INTERFACE,
in_signature='a{sv}s',
out_signature='s')
@@ -358,7 +411,9 @@ class DataStore(dbus.service.Object):
for r in results:
props = {}
props.update(r.properties)
-
+
+ # on versioning stores uid will be different
+ # than r.id but must be set
if 'uid' not in props:
props['uid'] = r.id
@@ -379,8 +434,8 @@ class DataStore(dbus.service.Object):
return (d, len(results))
- def get(self, uid):
- mp = self._resolveMountpoint()
+ def get(self, uid, mountpoint=None):
+ mp = self._resolveMountpoint(mountpoint)
c = None
try:
c = mp.get(uid)
@@ -452,15 +507,16 @@ class DataStore(dbus.service.Object):
def Updated(self, uid): pass
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
+ in_signature='ss',
out_signature='')
- def delete(self, uid):
- content = self.get(uid)
+ def delete(self, uid, mountpoint=None):
+ content = self.get(uid, mountpoint)
if content:
content.backingstore.delete(uid)
self.Deleted(uid)
logger.debug("deleted %s" % uid)
+
@dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
def Deleted(self, uid): pass
diff --git a/src/olpc/datastore/deltastream.py b/src/olpc/datastore/deltastream.py
deleted file mode 100644
index f880a07..0000000
--- a/src/olpc/datastore/deltastream.py
+++ /dev/null
@@ -1,45 +0,0 @@
-"""
-deltastream
-~~~~~~~~~~~~~~~~~~~~
-A forward or backward stream of delta information used to manage file
-versions efficiently
-
-"""
-
-__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>'
-__docformat__ = 'restructuredtext'
-__copyright__ = 'Copyright ObjectRealms, LLC, 2007'
-__license__ = 'The GNU Public License V2+'
-
-
-import bsdiff
-FULL = 1
-PATCH = 2
-
-class DeltaStream(object):
- """Record and Reconstruct objects from a forward diff chain. When diff
- size/distance from the original is over a threshold we record a
- new version in its entirety
- """
-
- def _record(self, old_fn, new_fn):
- od = open(old_fn, 'r').read()
- nd = open(new_fn, 'r').read()
-
- #XXX: This needs to be done more memory efficiently
- patch = bsdiff.Patch(od, nd)
- # XXX: again, memory inefficient
- if len(str(patch)) < (len(nd) / 2.0):
- # The patch is larger than some threshold, we want to
- # record a new full version rather than a patch
- return FULL, nd
- else:
- return PATCH, patch
-
- def record(self, old_fn, new_fn):
- mode, data = self._record(old_fn, new_fn)
- if mode is FULL:
- pass
- elif mode is PATCH:
- pass
-
diff --git a/src/olpc/datastore/hg_backingstore.py b/src/olpc/datastore/hg_backingstore.py
new file mode 100644
index 0000000..588e3f6
--- /dev/null
+++ b/src/olpc/datastore/hg_backingstore.py
@@ -0,0 +1,369 @@
+from olpc.datastore.backingstore import FileBackingStore
+from olpc.datastore.sxattr import Xattr
+from olpc.datastore.utils import create_uid
+from olpc.datastore.bin_copy import bin_copy
+
+from mercurial import repo, filelog, transaction, util, revlog
+import os, sys, tempfile
+
+# Xattr attribute namespace
+NAMESPACE = "datastore"
+
+
+
+class localizedFilelog(filelog.filelog):
+ def __init__(self, opener, path, cwd=None):
+ self._fullpath = os.path.realpath(cwd)
+ revlog.revlog.__init__(self, opener,
+ self.encodedir(path + ".i"))
+ @property
+ def rindexfile(self): return os.path.join(self._fullpath, self.indexfile)
+
+ @property
+ def rdatafile(self): return os.path.join(self._fullpath, self.datafile)
+
+class FileRepo(repo.repository):
+ """A very simple file repository that functions without the need
+ for global changesets or a working copy.
+ """
+
+ def __init__(self, path):
+ self.basepath = path
+ self.root = os.path.realpath(self.basepath)
+ self.repo = "data"
+ self.datadir = os.path.join(self.basepath, ".ds")
+ if not os.path.exists(self.basepath):
+ os.makedirs(self.basepath)
+ #os.chdir(self.basepath)
+ self.wopener = util.opener(self.root)
+
+ def file(self, path):
+ eopen = util.encodedopener(util.opener('.ds'), util.encodefilename)
+ fl = localizedFilelog(eopen, path, cwd=self.datadir)
+ return fl
+
+
+ def __contains__(self, path):
+ """Is this path already managed by the repo?"""
+ f = self.file(path)
+ p = f.rindexfile
+ return os.path.exists(p)
+
+ def raw_copy(self, filenames):
+ # create localized repository versions of the raw data from
+ # another repo
+ # this doesn't do indexing or anything like that
+ for fn in filenames:
+ srcdir, n = os.path.split(fn)
+ target = self._rjoin(n)
+ bin_copy(fn, target)
+
+
+ def raw_sources(self):
+ # return list of filenames which must be copied
+ return [d for d in self._rdata() if d]
+
+ def _rjoin(self, path):
+ # repository path
+ return os.path.join(self.basepath, ".ds", path)
+
+ def _rdata(self, path):
+ """return the index and datafile names in the repository"""
+ f = self.file(path)
+ base = os.path.join(self.basepath, ".ds")
+ i = os.path.join(base, f.rindexfile)
+ d = os.path.join(base, f.rdatafile)
+ return (i and i or None, d and d or None)
+
+ def put(self, path, source, parent=None, text=None, meta=None):
+ """Create a new revision of the content indicated by
+ 'path'. 'source' is the filename containing the data we wish
+ to commit. parent when present is the parent revision this
+ data comes from. When parent is not provided we first look at
+ the source file for the xattr 'user.datastore.revision', if
+ that is present we assume it is the parent revision of the
+ element in path. When absent we assume that this is a delta to
+ the tip.
+
+
+ @return rev, parent, parent is tip, changeset
+ - rev is this files new revision number
+ - parent is the id of the revision used as the parent
+ - parent_is_tip indicates if the parent was the most recent
+ head
+ - changeset is the random uid of this transaction used in
+ merges
+ """
+
+ # we don't have a real working copy model. The source file may
+ # exist in some path or location well outside the repo (and
+ # most likely does)
+ f = self.file(path)
+ data = open(source, 'r').read()
+
+ tip = f.tip()
+ if not parent:
+ x = Xattr(source, NAMESPACE)
+ # attempt to resolve the revision number from the property
+ parent = x.get('revision')
+ if parent:
+ parent = int(parent) # from unicode
+ else:
+ # there wasn't an attribute on the file
+ # this can happen if the file was created by the
+ # client or
+ # the filesystem didn't support xattr
+ # in this case we assume the revision is the tip
+ parent = tip
+
+ if isinstance(parent, int):
+ # its a revision number, resolve it to a node id
+ parent = f.node(parent)
+
+ if not f.cmp(parent, data):
+ # they are the same
+ return
+
+ # assume some default values for missing metadata
+ # changeset is important here. Files don't properly belong to
+ # change sets, but this uid is used to discriminate versions
+ # with identical revision numbers from different sources
+ changeset = create_uid()
+ if not meta: meta = {}
+ meta.setdefault('text', text and text or "automatic commit")
+ meta['changeset'] = changeset
+
+ # commit the data to the log
+ t = self.transaction()
+ rev = f.count() + 1
+ f.add(data, meta, t, rev, parent)
+ t.close()
+
+ return rev, parent, parent == tip, changeset
+
+ def transaction(self):
+ return transaction.transaction(sys.stderr.write, open, "journal")
+
+
+ def tip(self, path):
+ # return the revision id that is the tip for a given path
+ l = self.file(path)
+ return l.rev(l.tip())
+
+ def revision(self, path, rev):
+ """Given the path name return the data associated with the raw
+ revision"""
+ # we really need a less memory intensive way of doing this
+ # stream the data to stable media as it processes the delta
+ if path not in self: raise KeyError("%s is not managed by repo" % path)
+ l = self.file(path)
+ if isinstance(rev, int):
+ n = l.node(rev)
+ else:
+ n = rev
+ return l.read(n)
+
+ def dump(self, path, rev, target):
+ """Dump the contents of a revision to the filename indicated
+ by target"""
+ fp = open(target, "w")
+ fp.write(self.revision(path, rev))
+ fp.close()
+ # tag the checkout with its current revision
+ # this is used to aid in parent chaining on commits
+ x = Xattr(target, NAMESPACE)
+ x['revision'] = rev
+
+ def remove(self, path):
+ """Hard remove the whole version history of an object"""
+ i, d = self._rdata(path)
+ if i and os.path.exists(i):
+ os.unlink(i)
+ # for small files d will not exist as the data is inlined to
+ # the the index
+ if d and os.path.exists(d):
+ os.unlink(d)
+
+ def strip(self, path, rev):
+ """attempt to remove a given revision from the history of
+ path"""
+ f = self.file(path)
+ f.strip(rev, rev)
+
+
+class HgBackingStore(FileBackingStore):
+ """This backingstore for the datastore supports versioning by
+ keeping a barebones Mercurial repository under the hood
+ """
+ capabilities = ("file", "versions")
+
+ def __init__(self, uri, **kwargs):
+ # the internal handle to the HgRepo
+ self.repo = None
+ uri = uri[len('hg:'):]
+ super(HgBackingStore, self).__init__(uri, **kwargs)
+
+ @staticmethod
+ def parse(uri):
+ return uri.startswith("hg:")
+
+ def check(self):
+ if not os.path.exists(self.uri): return False
+ if not os.path.exists(self.base): return False
+ return True
+
+ def initialize(self):
+ super(FileBackingStore, self).initialize()
+ self.repo = FileRepo(self.base)
+
+ def load(self):
+ super(HgBackingStore, self).load()
+ # connect the repo
+ if not self.repo:
+ self.repo = FileRepo(self.base)
+
+ # File Management API
+ def create(self, props, filelike):
+ # generate the uid ourselves. we do this so we can track the
+ # uid and changeset info
+ # Add it to the index
+ uid = create_uid()
+ props['uid'] = uid
+ props.setdefault('message', 'initial')
+ uid, rev = self.checkin(props, filelike)
+ return uid
+
+ def get(self, uid, rev=None, env=None):
+ # we have a whole version chain, but get contracts to
+ # return a single entry. In this case we default to 'tip'
+ if not rev:
+ rev = self.repo.tip(uid)
+ results, count = self.indexmanager.get_by_uid_prop(uid, rev)
+ if count == 0:
+ raise KeyError(uid)
+ elif count == 1:
+ return results.next()
+
+ raise ValueError("Got %d results for 'get' operation on %s" %(count, uid))
+
+######################################################################
+# XXX: This whole policy is botched unless the journal grows an
+# # interface to display other versions of the main document
+# # which it doesn't have. If I do this then we don't see those
+# # versions in the journal and have no means to access
+# # them. For the time being we just index everything and let
+# # date sort them out.
+######################################################################
+# # recover the old records for this uid
+# # update may rewrite/remove 1-n documents
+# # depending on the number of heads and so on
+# # this needs to be done under a single transaction
+# # actions will be a set of commands passed to
+# # xapianindex.enque
+# # the loop will process the entire action set at once
+#
+# # we need to pass head/tip tags from the parent to the child
+# # as a result of the update
+# # XXX: currently we are only going to index the HEADs and TIP
+# # revisions in the repository (and those marked with KEEP).
+# # This means that when we update
+# # with these tags:
+# # we can remove the old version from xapian
+# # w/o these tags:
+# # it gets a head tag, implying a branch
+# #
+# # because the keep flag indicates content is needed to be kept
+# # locally we have two real choices, either
+# # move it forward with each revision
+# # keep only the original tagged version
+# # and index the new one as well (because it will have the
+# # HEAD tag)
+##########################################################################
+ def update(self, uid, props, filelike):
+ props['uid'] = uid
+ uid, rev = self.checkin(props, filelike)
+ return uid
+
+
+ def delete(self, uid, rev=None):
+ # delete the object at 'uid', when no rev is passed tip is
+ # removed
+ if rev is None:
+ rev = self.repo.tip(uid)
+ c = self.get(uid, rev)
+ self.indexmanager.delete(c.id)
+ self.repo.strip(uid, rev)
+
+ def _targetFile(self, uid, target=None, ext=None, env=None):
+ c = self.indexmanager.get(uid)
+ rev = int(c.get_property('vid'))
+ rev -= 1 # adjust for 0 based counting
+ self.repo.dump(uid, rev, target)
+ return open(target, 'rw')
+
+
+ def checkin(self, props, filelike):
+ """create or update the content object, creating a new
+ version"""
+ uid = props.setdefault('uid', create_uid())
+ if filelike:
+ message = props.setdefault('message', 'initial')
+ parent = props.pop('parent', None)
+ rev, parent, isTip, changeset = self.repo.put(uid, filelike,
+ parent, message,
+ meta=dict(uid=uid))
+ # the create case is pretty simple
+ # mark everything with defaults
+ props['changeset'] = changeset
+ props['vid'] = str(rev)
+
+ self.indexmanager.index(props, filelike)
+ return uid, rev
+
+ def checkout(self, uid, vid=None, target=None, dir=None):
+ """checkout the object with this uid at vid (or HEAD if
+ None). Returns (props, filename)"""
+ # use the repo to drive the property search
+ f = self.repo.file(uid)
+ if vid:
+ vid = f.node(int(vid) -1) # base 0 counting
+ else:
+ vid = f.tip()
+ rev = f.rev(vid)
+ # there will only be one thing with the changeset id of this
+ # 'f'
+ m = f._readmeta(vid)
+ changeset = m['changeset']
+ objs, count = self.indexmanager.search(dict(changeset=changeset))
+ assert count == 1
+ obj = objs.next()
+
+ if not target:
+ target, ext = obj.suggestName()
+ if not target:
+ fd, fn = tempfile.mkstemp(dir=dir)
+ target = fn
+
+ if not target.startswith('/'):
+ if dir: target = os.path.join(dir, target)
+ else: os.path.join('/tmp', target)
+
+ self.repo.dump(uid, rev, target)
+ return obj.properties, target
+
+if __name__ == "__main__":
+ import rlcompleter2
+ rlcompleter2.setup(verbose=0)
+
+ TESTLOC = "/tmp/fltest"
+ os.system('rm -rf %s' % TESTLOC)
+
+ c = FileRepo(TESTLOC)
+
+ n = c.blit("foo", "this is a test")
+ m = c.blit("bar", "another test")
+
+
+ o = c.blit("foo", "\nanother line", mode="a")
+
+ c.revisions("foo")
diff --git a/src/olpc/datastore/model.py b/src/olpc/datastore/model.py
index 14bb4b9..746d727 100644
--- a/src/olpc/datastore/model.py
+++ b/src/olpc/datastore/model.py
@@ -169,7 +169,7 @@ class Model(object):
# Properties we don't automatically include in properties dict
-EXCLUDED_PROPERTIES = ['fulltext', ]
+EXCLUDED_PROPERTIES = ['fulltext', 'head', 'tip', 'changeset']
class Content(object):
"""A light weight proxy around Xapian Documents from secore.
@@ -227,7 +227,8 @@ class Content(object):
# checkout name
filename = self.get_property('filename', None)
ext = self.get_property('ext', '')
-
+ title = self.get_property('title', '')
+
if filename:
# some backingstores keep the full relative path
filename = os.path.split(filename)[1]
@@ -242,7 +243,11 @@ class Content(object):
ext = mimetypes.guess_extension(mt)
# .ksh is a strange ext for plain text
if ext and ext == '.ksh': ext = '.txt'
+ if title: return title, ext
if ext: return None, ext
+
+ if title: return title, None
+
return None, None
def get_file(self):
@@ -268,9 +273,11 @@ class Content(object):
@property
def backingstore(self): return self._backingstore
-
+
+ # id of this record (unique in index)
@property
- def id(self): return self._doc.id
+ def id(self):
+ return self._doc.id
@property
def data(self): return self._doc.data
@@ -295,6 +302,7 @@ def encode_datetime(value):
def decode_datetime(value):
# convert a float to a local datetime
+ if not value: return None
return datetime.datetime.fromtimestamp(float(value)).isoformat()
def datedec(value, dateformat=DATEFORMAT):
@@ -346,8 +354,16 @@ registerPropertyType('date', dateenc, datedec, 'float', {'store' : True,
defaultModel = Model().addFields(
('fulltext', 'text'),
+ # unique to the object through out its history
+ ('uid', 'string'),
# vid is version id
- ('vid', 'number'),
+ # linear version id's don't imply direct lineage, only the relative merge order
+ ('vid', 'string'),
+ # unique token per change to help disambiguate for merges
+ ('changeset', 'string'),
+ #('head', 'int'), # is this a head revision
+ #('tip', 'int'), # is this the most recent head
+
('checksum', 'string'),
('filename', 'string'),
('ext', 'string'), # its possible we don't store a filename, but
diff --git a/src/olpc/datastore/sxattr.py b/src/olpc/datastore/sxattr.py
new file mode 100644
index 0000000..78a64f3
--- /dev/null
+++ b/src/olpc/datastore/sxattr.py
@@ -0,0 +1,54 @@
+"""
+simplified xattr
+~~~~~~~~~~~~~~~~~~~~
+automatically manage prefixes into the xattr namespace
+
+"""
+
+__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>'
+__docformat__ = 'restructuredtext'
+__copyright__ = 'Copyright ObjectRealms, LLC, 2007'
+__license__ = 'The GNU Public License V2+'
+
+
+import xattr
+
+class Xattr(object):
+ def __init__(self, filename, prefix, implicitUser=True):
+ self.filename = filename
+ self.prefix=[]
+ if implicitUser: self.prefix.append('user')
+ self.prefix.append(prefix)
+ self.ns = '.'.join(self.prefix)
+ self.keyed = lambda x: '.'.join((self.ns, x))
+
+ def __getitem__(self, key):
+ v = xattr.getxattr(self.filename, self.keyed(key))
+ return v.decode('utf-8')
+
+ def __setitem__(self, key, value):
+ if isinstance(value, unicode):
+ value = value.encode("utf-8")
+ else:
+ value = str(value)
+ xattr.setxattr(self.filename, self.keyed(key), value)
+
+ def __delitem__(self, key):
+ xattr.removexattr(self.filename, self.keyed(key))
+
+ def get(self, key, default=None):
+ try:
+ return self[key]
+ except IOError:
+ return default
+
+ def iterkeys(self):
+ all = xattr.listxattr(self.filename)
+ for key in all:
+ if key.startswith(self.ns):
+ yield key[len(self.ns) + 1:]
+
+ def keys(self):
+ return list(self.iterkeys())
+
+
diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py
index d46e0c7..5f29feb 100644
--- a/src/olpc/datastore/xapianindex.py
+++ b/src/olpc/datastore/xapianindex.py
@@ -15,6 +15,7 @@ __license__ = 'The GNU Public License V2+'
from Queue import Queue, Empty
import logging
+import os
import re
import sys
import time
@@ -23,6 +24,7 @@ import threading
import warnings
import secore
+import xapian as _xapian # we need to modify the QueryParser
from olpc.datastore import model
from olpc.datastore.converter import converter
@@ -127,7 +129,12 @@ class IndexManager(object):
self.write_index.flush()
self.read_index.reopen()
-
+ def enqueSequence(self, commands):
+ """Takes a sequence of arugments to the normal enque function
+ and executes them under a single lock/flush cycle
+ """
+ self.queue.put(commands)
+
def enque(self, uid, vid, doc, operation, filestuff=None):
# here we implement the sync/async policy
# we want to take create/update operations and
@@ -162,52 +169,76 @@ class IndexManager(object):
# from the item in the repo as that will become our immutable
# copy. Detect those cases and use the internal filename
# property or backingstore._translatePath to get at it
+ versions = self.versions
+ inplace = self.inplace
+
while self.indexer_running:
# include timeout here to ease shutdown of the thread
# if this is a non-issue we can simply allow it to block
try:
data = self.queue.get(True, 0.025)
- uid, vid, doc, operation, filestuff = data
+ # when we enque a sequence of commands they happen
+ # under a single write lock pass through the loop and
+ # the changes become visible at once.
+
+ if not isinstance(data[0], (list, tuple)):
+ data = (data,)
except Empty:
- #time.sleep(1.0)
continue
try:
with self._write_lock:
- if operation is DELETE:
- self.write_index.delete(uid)
- logger.info("deleted content %s" % (uid,))
- elif operation is UPDATE:
- # Here we handle the conversion of binary
- # documents to plain text for indexing. This is
- # done in the thread to keep things async and
- # latency lower.
- # we know that there is filestuff or it
- # wouldn't have been queued
- filename, mimetype = filestuff
- if isinstance(filename, file):
- filename = filename.name
- fp = converter(filename, mimetype)
- if fp:
- # read in at a fixed block size, try to
- # conserve memory. If this doesn't work
- # we can make doc.fields a generator
- while True:
- chunk = fp.read(2048)
- if not chunk: break
- doc.fields.append(secore.Field('fulltext', chunk))
+ for item in data:
+ uid, vid, doc, operation, filestuff = item
+
+ if operation is DELETE:
+ self.write_index.delete(uid)
+ logger.info("deleted content %s" % (uid,))
+ elif operation is UPDATE:
+ # Here we handle the conversion of binary
+ # documents to plain text for indexing. This is
+ # done in the thread to keep things async and
+ # latency lower.
+ # we know that there is filestuff or it
+ # wouldn't have been queued
+ filename, mimetype = filestuff
+ if isinstance(filename, file):
+ filename = filename.name
+ if filename and not os.path.exists(filename):
+ # someone removed the file before
+ # indexing
+ # or the path is broken
+ logger.warning("Expected file for"
+ " indexing at %s. Not"
+ " Found" % filename)
- self.write_index.replace(doc)
- logger.info("update file content %s:%s" % (uid, vid))
+ fp = converter(filename, mimetype)
+ if fp:
+ # read in at a fixed block size, try to
+ # conserve memory. If this doesn't work
+ # we can make doc.fields a generator
+ while True:
+ chunk = fp.read(2048)
+ if not chunk: break
+ doc.fields.append(secore.Field('fulltext', chunk))
+
+ self.write_index.replace(doc)
+ if versions and not inplace:
+ # we know the source file is ours
+ # to remove
+ os.unlink(filename)
+
+ logger.info("update file content %s:%s" % (uid, vid))
+ else:
+ logger.warning("""Conversion process failed for document %s %s""" % (uid, filename))
else:
- logger.warning("""Conversion process failed for document %s %s""" % (uid, filename))
- else:
- logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation))
-
+ logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation))
+
# tell the queue its complete
self.queue.task_done()
- # we do flush on each record now
+ # we do flush on each record (or set for enque
+ # sequences) now
self.flush()
except:
logger.exception("Error in indexer")
@@ -267,34 +298,63 @@ class IndexManager(object):
return d
+ @property
+ def versions(self):
+ if self.backingstore:
+ return "versions" in self.backingstore.capabilities
+ return False
+
+ @property
+ def inplace(self):
+ if self.backingstore:
+ return "inplace" in self.backingstore.capabilities
+ return False
+
+
def index(self, props, filename=None):
"""Index the content of an object.
Props must contain the following:
key -> Property()
"""
operation = UPDATE
+
#
# Version handling
#
- # we implicitly create new versions of documents the version
- # id should have been set by the higher level system
+ # are we doing any special handling for versions?
uid = props.pop('uid', None)
- vid = props.pop('vid', None)
+
if not uid:
uid = create_uid()
operation = CREATE
- if vid: vid = str(float(vid) + 1.0)
- else: vid = "1.0"
# Property mapping via model
- props = self._mapProperties(props)
doc = secore.UnprocessedDocument()
add = doc.fields.append
- fp = None
+
+ vid = None
+ if self.versions:
+ vid = props.get("vid")
+ if not vid:
+ self.warn("Didn't supply needed versioning information"
+ " on a backingstore which performs versioning")
+ # each versions id is unique when using a versioning store
+ doc.id = create_uid()
+ else:
+ doc.id = uid
+
+ if not vid: vid = '1'
+ # on non-versioning stores this is redundant but on versioning
+ # stores it reference to the objects whole timeline
+ props['uid'] = uid
+ props['vid'] = vid
+
+ props = self._mapProperties(props)
+
filestuff = None
if filename:
# enque async file processing
@@ -302,11 +362,11 @@ class IndexManager(object):
# and open fp?
mimetype = props.get("mime_type")
mimetype = mimetype and mimetype.value or 'text/plain'
+
+ filename = os.path.abspath(filename)
filestuff = (filename, mimetype)
- doc.id = uid
- add(secore.Field('vid', vid))
-
+
#
# Property indexing
for k, prop in props.iteritems():
@@ -322,13 +382,26 @@ class IndexManager(object):
# queue the document for processing
self.enque(uid, vid, doc, operation, filestuff)
- return uid
+ return doc.id
def get(self, uid):
doc = self.read_index.get_document(uid)
if not doc: raise KeyError(uid)
return model.Content(doc, self.backingstore, self.datamodel)
+ def get_by_uid_prop(self, uid, rev=None):
+ # versioning stores fetch objects by uid
+ # when rev is passed only that particular rev is returne
+ ri = self.read_index
+ q = ri.query_field('uid', uid)
+ if rev:
+ q = ri.query_filter(q, ri.query_field('vid', str(rev)))
+ results, count = self._search(q, 0, 1000)
+
+ return results, count
+
+
+
def delete(self, uid):
# does this need queuing?
# the higher level abstractions have to handle interaction
@@ -381,8 +454,11 @@ class IndexManager(object):
q = ri.query_composite(ri.OP_AND, queries)
else:
q = self.parse_query(query)
-
- results = ri.search(q, start_index, end_index)
+
+ return self._search(q, start_index, end_index)
+
+ def _search(self, q, start_index, end_index):
+ results = self.read_index.search(q, start_index, end_index)
count = results.matches_estimated
# map the result set to model.Content items
@@ -423,6 +499,12 @@ class IndexManager(object):
# 'title:foo' match a document whose title contains 'foo'
# 'title:"A tale of two datastores"' exact title match
# '-this that' match that w/o this
+
+ # limited support for wildcard searches
+ qp = _xapian.QueryParser
+
+ flags = (qp.FLAG_LOVEHATE)
+
ri = self.read_index
start = 0
end = len(query)
@@ -450,11 +532,35 @@ class IndexManager(object):
#XXX: strip quotes or not here
#word = query[orig+1:qm.end(1)-1]
word = query[orig:qm.end(1)]
+ # this is a phase modify the flags
+ flags |= qp.FLAG_PHRASE
start = qm.end(1) + 1
if field:
queries.append(ri.query_field(field, word))
else:
- queries.append(ri.query_parse(word))
+ if word.endswith("*"):
+ flags |= qp.FLAG_WILDCARD
+ q = self._query_parse(word, flags)
+
+ queries.append(q)
q = ri.query_composite(ri.OP_AND, queries)
return q
+
+
+
+ def _query_parse(self, word, flags=0, op=None):
+ # while newer secore do pass flags it doesn't allow control
+ # over them at the API level. We override here to support
+ # adding wildcard searching
+ ri = self.read_index
+ if op is None: op = ri.OP_AND
+ qp = ri._prepare_queryparser(None, None, op)
+ try:
+ return qp.parse_query(word, flags)
+ except _xapian.QueryParserError, e:
+ # If we got a parse error, retry without boolean operators (since
+ # these are the usual cause of the parse error).
+ return qp.parse_query(string, 0)
+
+
diff --git a/tests/mountpoints.txt b/tests/mountpoints.txt
index 45a359a..0f736be 100644
--- a/tests/mountpoints.txt
+++ b/tests/mountpoints.txt
@@ -147,12 +147,13 @@ primary store.
>>> props['mountpoint'] = mountpoint
>>> fn = ds.get_filename(uid)
+>>> del props['uid']
>>> copyuid = ds.create(props, fn)
>>> ds.complete_indexing()
->>> result, count = ds.find(dict(fulltext="four"))
+>>> result, count = ds.find(dict(query="four"))
>>> assert count == 2
We also need to test that we can copy from a normal store to an
@@ -177,7 +178,7 @@ We also need to be sure that delete commands work on inplace
mounts. We will delete the object from the datastore and then verify
that the file is missing.
->>> ds.delete(pen_copy)
+>>> ds.delete(pen_copy, mountpoint=mp3)
>>> ds.complete_indexing()
>>> os.path.exists('/tmp/store3/one.txt')
diff --git a/tests/simple_versions.txt b/tests/simple_versions.txt
index 9bf61d3..9be62bd 100644
--- a/tests/simple_versions.txt
+++ b/tests/simple_versions.txt
@@ -9,11 +9,12 @@ Let's create a simple datastore and add some content.
>>> from olpc.datastore import DataStore
>>> from olpc.datastore import backingstore
+>>> from olpc.datastore import hg_backingstore
>>> from testutils import *
>>> ds = DataStore()
->>> ds.registerBackend(backingstore.FileBackingStore)
->>> mp1 = ds.mount("/tmp/store1", dict(title="Primary Storage"))
+>>> ds.registerBackend(hg_backingstore.HgBackingStore)
+>>> mp1 = ds.mount("hg:/tmp/store1", dict(title="Primary Storage"))
The checkin operation will create new content or update existing
@@ -48,11 +49,10 @@ To get a copy of this file out that we can manipulate we can use the
checkout command. By default checkout will check out the HEAD revsion
(the most recent) of a document. It returns the properties dictionary
and the filename of the checkout which is ours to manipulate.
-
>>> props, fn = ds.checkout(uid)
>>> assert props['title'] == "A day in the life"
->>> assert props['vid'] == vid
+>>> assert props['vid'] == str(vid)
>>> contents = open(fn, 'r').read()
>>> assert contents.startswith("Part 1")
@@ -67,15 +67,14 @@ call after making our modifications and supplying our new file.
(note that we changed the case of 'life' here)
>>> props['title'] = "A day in the Life"
->>> ds.checkin(props, fn2)
+>>> uid, vid = ds.checkin(props, fn2)
>>> ds.complete_indexing()
-Verify that the contents of the old version isn't returned in the
-search. By default old versions are not included.
+Old versions of content are still available.
->>> r, c = ds.find("begins"))
->>> assert c == 0
+>>> r, c = ds.find("begins")
+>>> assert c == 1
Verify that the HEAD revision of the content is searchable by default.
@@ -89,7 +88,7 @@ Lets check out the head version of this document now.
Check that the id is the same and the version id isn't.
->>> assert props['id'] == uid
+>>> assert props['uid'] == uid
>>> assert props['vid'] != vid
Verify the contents of the file is as expected.
@@ -98,7 +97,6 @@ Verify the contents of the file is as expected.
>>> assert contents.startswith("Part Two")
-
>>> ds.stop(); del ds
diff --git a/tests/test_hgrepo.py b/tests/test_hgrepo.py
new file mode 100644
index 0000000..c67c506
--- /dev/null
+++ b/tests/test_hgrepo.py
@@ -0,0 +1,114 @@
+import unittest
+from testutils import blit, tmpData
+
+from olpc.datastore import hg_backingstore
+import os
+
+DEFAULT_STORE = '/tmp/hgtest'
+DATA_1 = '/tmp/data1'
+DATA_2 = '/tmp/data2'
+
+class Test(unittest.TestCase):
+ def setUp(self):
+ os.system("rm -rf %s" % DEFAULT_STORE)
+ os.system("rm -rf %s" % DATA_1)
+ os.system("rm -rf %s" % DATA_2)
+
+ os.makedirs(DATA_1)
+ os.makedirs(DATA_2)
+
+ def tearDown(self):
+ os.system("rm -rf %s" % DEFAULT_STORE)
+ os.system("rm -rf %s" % DATA_1)
+ os.system("rm -rf %s" % DATA_2)
+
+ def test_hgrepo(self):
+ repo = hg_backingstore.FileRepo(DEFAULT_STORE)
+
+ # create a test file in DATA_1
+ TEXT_1= "this is a test"
+ fn1 = blit(TEXT_1, os.path.join(DATA_1, "s1"))
+ # and the version we will use later in DATA_2
+ # we do this to test tracking from different source dirs
+ TEXT_2 = "another test"
+ fn2 = blit(TEXT_2, os.path.join(DATA_2, "s2"))
+
+ # this should add foo to the repo with TEXT_1 data
+ repo.put("foo", fn1, text="initial")
+
+ # now commit fn2 to the same path, will create another
+ # revision
+ # of the existing data
+ repo.put("foo", fn2, text="updated")
+
+ # now verify access to both revisions and their data
+ # we check things out into DATA_1
+ co1 = os.path.join(DATA_1, "co1")
+ co2 = os.path.join(DATA_1, "co2")
+
+ repo.dump("foo", 0, co1)
+ repo.dump("foo", 1, co2)
+
+ assert open(co1, 'r').read() == TEXT_1
+ assert open(co2, 'r').read() == TEXT_2
+
+ def test_hgbs(self):
+ bs = hg_backingstore.HgBackingStore("hg:%s" % DEFAULT_STORE)
+ bs.initialize_and_load()
+ bs.create_descriptor()
+ desc = bs.descriptor()
+ assert 'id' in desc
+ assert 'uri' in desc
+ assert 'title' in desc
+ assert desc['title'] is not None
+
+ d = """This is a test"""
+ d2 = "Different"
+
+ uid, rev = bs.checkin(dict(title="A", filename="a.txt"), tmpData(d))
+
+ bs.complete_indexing()
+
+ props, fn = bs.checkout(uid)
+
+ assert props.get('title') == "A"
+ got = open(fn, 'r').read()
+ assert got == d
+
+ uid, rev = bs.checkin(dict(uid=uid, title="B"), tmpData(d2))
+
+ bs.complete_indexing()
+
+ props, fn = bs.checkout(uid)
+ assert props.get('title') == "B"
+ got = open(fn, 'r').read()
+ assert got == d2
+
+ # go back and check out the first version
+ props, fn = bs.checkout(uid, 1)
+ assert props.get('title') == "A"
+ got = open(fn, 'r').read()
+ assert got == d
+
+ bs.delete(uid, props['vid'])
+ bs.complete_indexing()
+
+ # There is no more revision 2
+ self.failUnlessRaises(KeyError, bs.get, uid, 1)
+
+## props, fn = bs.checkout(uid)
+
+## import pdb;pdb.set_trace()
+## assert props.get('title') == "A"
+## got = open(fn, 'r').read()
+## assert got == d
+
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(Test))
+ return suite
+
+if __name__ == "__main__":
+ unittest.main()
+
diff --git a/tests/testutils.py b/tests/testutils.py
index e81b22c..2f3e7ff 100644
--- a/tests/testutils.py
+++ b/tests/testutils.py
@@ -8,6 +8,14 @@ def tmpData(data):
os.close(fd)
return fn
+def blit(data, path=None):
+ if not path: return tmpData(data)
+ fp = open(path, 'w')
+ fp.write(data)
+ fp.close()
+ return path
+
+
# Search result set handlers
def expect(r, count=None):
if count: assert r[1] == count
diff --git a/tests/xapianindex.txt b/tests/xapianindex.txt
index 22aa05d..b28ac90 100644
--- a/tests/xapianindex.txt
+++ b/tests/xapianindex.txt
@@ -81,6 +81,8 @@ But an OR query for missing values still return nothing.
... 'audio/ogg'])))
+Partial search...
+>>> assert expect_single(im.search(r'''pee*''')).id == uid