diff options
-rwxr-xr-x | bin/datastore-service | 3 | ||||
-rw-r--r-- | bin/test-trial3.py | 202 | ||||
-rw-r--r-- | docs/requirements.txt | 110 | ||||
-rw-r--r-- | src/olpc/datastore/backingstore.py | 1 | ||||
-rw-r--r-- | src/olpc/datastore/config.py | 13 | ||||
-rw-r--r-- | src/olpc/datastore/datastore.py | 145 | ||||
-rw-r--r-- | src/olpc/datastore/hg_backingstore.py | 124 | ||||
-rw-r--r-- | src/olpc/datastore/lru.py | 81 | ||||
-rw-r--r-- | src/olpc/datastore/model.py | 31 | ||||
-rw-r--r-- | src/olpc/datastore/sxattr.py | 11 | ||||
-rw-r--r-- | src/olpc/datastore/utils.py | 3 | ||||
-rw-r--r-- | src/olpc/datastore/xapianindex.py | 15 | ||||
-rw-r--r-- | tests/simple_versions.txt | 4 | ||||
-rw-r--r-- | tests/test_hgrepo.py | 7 |
14 files changed, 659 insertions, 91 deletions
diff --git a/bin/datastore-service b/bin/datastore-service index c84e314..c2a29fa 100755 --- a/bin/datastore-service +++ b/bin/datastore-service @@ -3,7 +3,7 @@ import sys, os, signal import gobject import dbus.service import dbus.mainloop.glib -from olpc.datastore import DataStore, DS_LOG_CHANNEL, backingstore +from olpc.datastore import DataStore, DS_LOG_CHANNEL, backingstore, hg_backingstore import logging # Path handling @@ -48,6 +48,7 @@ bus = dbus.SessionBus() ds = DataStore() ds.registerBackend(backingstore.FileBackingStore) ds.registerBackend(backingstore.InplaceFileBackingStore) +ds.registerBackend(hg_backingstore.HgBackingStore) ds.mount(repo_dir) ds.complete_indexing() diff --git a/bin/test-trial3.py b/bin/test-trial3.py new file mode 100644 index 0000000..76594c8 --- /dev/null +++ b/bin/test-trial3.py @@ -0,0 +1,202 @@ +import dbus +import os +import shutil +import popen2 +import tempfile + +DEFAULT_STORE = "/tmp/store1" + +def _create_temp(content): + pt, pp = tempfile.mkstemp(suffix=".txt") + os.write(pt, content) + os.close(pt) + return pp + + +def _create_temp_odt(content): + # This just isn't working on my system + # first write the contents to a temp file, then convert it + pt, pp = tempfile.mkstemp(suffix=".txt") + os.write(pt, content) + os.close(pt) + + f, temp_path = tempfile.mkstemp(suffix='.odt') + del f + # my abiword didn't support fd://0 + cmd = 'abiword --to=odt --to-name=%s %s' % (temp_path, pp) + print cmd + child_stdout, child_stdin, child_stderr = popen2.popen3(cmd) + child_stdin.write(content) + child_stdin.close() + child_stdout.close() + child_stderr.close() + os.unlink(pp) + + return temp_path + +def start(): + if os.path.exists(DEFAULT_STORE): + os.system("rm -rf %s" % DEFAULT_STORE) + + bus = dbus.SessionBus() + bobj = bus.get_object("org.laptop.sugar.DataStore", + "/org/laptop/sugar/DataStore") + + + ds = dbus.Interface(bobj, dbus_interface='org.laptop.sugar.DataStore') + + mp = ds.mount("hg:%s" % DEFAULT_STORE, + dict(title="Primary Storage")) + return ds, mp + +def stop(ds): + ds.stop() + + + +ds, mp = start() +# Browse starts download +print "Download Simulation" +props = {'title': 'Downloading test.pdf from \nhttp://example.org/test.pdf.', + 'mime_type': 'application/pdf', + 'progress': '0', + 'mountpoint' : mp, + } +uid, vid = ds.checkin(props, '') +print 'Created download: %s %s' % (uid, vid) +assert uid +assert vid == '1' +ds.complete_indexing() + +# Browse notifies the DS about the progress +props['uid'] = uid +for i in range(1, 5): + props['progress'] = str(i * 20) + props['vid'] = vid + uid, vid = ds.checkin(props, '') + print 'Updated download: %s %s %s' % (uid, vid, props['progress']) + assert uid + ds.complete_indexing() + +# Browse finishes the download +# Now assume we have a file called tests/test.pdf (which there is if +# this is run from the project root) +# Checkin Gives the file to the datastore, it no longer owns it and +# will be removed when the datastore is done with it +source_path = "tests/test.pdf" +dest_path = "/tmp/test.pdf" +# we don't want it to kill our test file so we copy it to /tmp +shutil.copy(source_path, dest_path) + +props['title'] = 'File test.pdf downloaded from\nhttp://example.org/test.pdf.' +props['progress'] = '100' +uid, vid = ds.checkin(props, dest_path) +ds.complete_indexing() +print 'Updated download with file: %s %s %s' % (uid, vid, props['progress']) +assert uid +#assert vid == '1' + + +# Check the DS has removed the file. +assert not os.path.exists(dest_path) + +# Journal displays the download +objects, count = ds.find({'title': 'downloaded', 'order_by' : ['-vid']}) +props = objects[0] +print 'Query returned: %s' % props['uid'], count +assert props['vid'] == vid, """%s != %s""" % (props['vid'], vid) # the last rev + +# Read resumes the entry +props, file_path = ds.checkout(uid, '', '', '', '') +print 'Entry checked out: %s %s\n%s' % (uid, file_path, props) +assert props +assert props['vid'] == vid +assert file_path + +# Read saves position and zoom level +props['position'] = '15' +props['zoom_level'] = '150' +props['activity'] = 'org.laptop.sugar.Read' + + +uid, nvid = ds.checkin(props, file_path) +print 'Updated Read state: %s %s' % (uid, nvid) +assert uid +assert nvid != "1" +assert nvid > vid, "%s < %s" %(nvid, vid) +ds.complete_indexing() + + +print "DONE" +#stop(ds) + +# test_writing +#ds, mp = start() +print "Writing test" +# Create empty entry +props = {'title': 'Write activity', 'mountpoint' : mp} +uid, vid = ds.checkin(props, '') +print 'Created entry: %s %s' % (uid, vid) +assert uid +assert vid == '1' + +ds.complete_indexing() + +# First checkout +props, file_path = ds.checkout(uid, '', '', '', '') +print 'Entry checked out: %s %s' % (uid, file_path) +assert props +assert props['vid'] == '1' +assert file_path == '' + + +# Write first contents +file_path = _create_temp('blah blah 1') + +props['mountpoint'] = mp +props['mime_type'] = 'text/plain' +uid, vid = ds.checkin(props, file_path) +ds.complete_indexing() +print 'First checkin: %s %s %s' % (uid, vid, file_path) +assert uid +#assert vid == '1' +assert not os.path.exists(file_path) + +# Second checkout +props, file_path = ds.checkout(uid, '', '', '', '') +print 'Entry checked out: %s %s' % (props['uid'], file_path) +assert props +#assert props['vid'] == '1' +assert file_path + +# Write second contents +file_path = _create_temp('blah blah 1\nbleh bleh 2') +props['mime_type'] = 'text/plain' +props['mountpoint'] = mp +uid, vid = ds.checkin(props, file_path) +ds.complete_indexing() +print 'Second checkin: %s %s %s' % (uid, vid, file_path) +assert uid +#assert vid == '2' +assert not os.path.exists(file_path) + +print "DONE" +#stop(ds) + + +# PREVIEW test +#ds, mp = start() +print "preview test" +props = {'title': 'Write activity', + 'preview': dbus.ByteArray('\123\456\789\000\123'), + 'mountpoint' : mp} +uid, vid = ds.checkin(props, '') +print 'Created entry: %s %s' % (uid, vid) +assert uid +assert vid == '1' + +props, file_path = ds.checkout(uid, '', '', '', '') +print props + +print "DONE" +#stop(ds) diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..6b6b538 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,110 @@ +Datastore +========= + +An outline of the basic requirements and design decisions of the +Datastore used in Sugar. + + +Description +----------- + +The Datastore (DS) is a context storage and indexing tool. Its +designed to provide efficient access to content while supplying both +browse and search models on top of this content. It doesn't this +without demanding any notion of a content repository, location or a +data API from the consumer. Additionally the Datastore supports +recovery of older versions of content and supports both backups and +remote storage. + + +Constraints +------------ + +The system is designed to run in a hostile environment. Operation +runtime and space are both highly constrained and only limited CPU +time/power is available to complete tasks. + +1. Limited Memory +2. Limited Processing Power +3. Limited Storage +4. Attempt to limit number of writes to medium. +5. All interactions through the public interface should return as soon +as possible. There is a 10 second window available to DBus, failure to +return in this time causes errors. Failure to return in under 0.2 +seconds can result in the UI blocking. + +Point 5 seems a system design flaw to me. The need for atomic +I/O bound operations is at odds with a completely asycnronous model +and the shouldn't call into such a system in a way that would make it +block. + + + +Versioning/Storage +------------------ + +The datastore needs to remain efficient in terms of both space and +time while deliverying content in a reliable fashion. Additionally +because its designed to function in an environment where we hope to +minimize the number of writes.en + +1. Recovery of previous versions of documents +2. Efficient Storage + Should work with both normal text and binary data +3. Should allow the archival of old versions removing the need to +store the entire version history. +4. Should support synchronization with remote stores. + + +Searching/Indexing +------------------ + +The DS provide searchable metadata and content indexing for the vast +majority of content on the system. + + +1. Searches should reflect immediate operations immediately. (Even though +the operations happen asynchronously). + +2. Fulltext searching of content should be possible and +accessible. Even through historic versions. + +3. Fulltext should support stemming of common terms in a given +language. + + + +Archival/Backups +---------------- + +The system should provide a model for long term storage and backup +support. Old versions of content should be migrated to a long term +storage. The system should provide a way identify content no longer +needed by the runtime when: + 1. Connected to a remote store + 2. Connected to the school server + 3. Space is limited + 4. System is idle + +The system should identify content subject to backups. Begin a remote +transaction with the storage repo. Migrate the old versions over an +SSH connection and then remove the old versions and the index +information for them from the local store. + + + +Remote Repositories +------------------- + +The DS is capable of mouting additional stores and having them +function as a single unit. This can extend from USB devices to remote +network based storage (through the use of SSH). + +If the model is SSH based then the remote stores don't require a +active server runtime, but will have increased latency for common +operations as indexes must be loaded on a per-request basis. Counting +on the remote OS to cache and manage this is an option, a TTL based +server start is an option, a per-user or per-machine server is also +possible. + + diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py index 1a97133..87b1ef1 100644 --- a/src/olpc/datastore/backingstore.py +++ b/src/olpc/datastore/backingstore.py @@ -15,7 +15,6 @@ from datetime import datetime import gnomevfs import os import re -import subprocess import time import threading diff --git a/src/olpc/datastore/config.py b/src/olpc/datastore/config.py new file mode 100644 index 0000000..5a3483f --- /dev/null +++ b/src/olpc/datastore/config.py @@ -0,0 +1,13 @@ + +# the name used by the logger +DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore' + +# DBus config +DS_SERVICE = "org.laptop.sugar.DataStore" +DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore" +DS_OBJECT_PATH = "/org/laptop/sugar/DataStore" + + +# XAttr namespace +XATTR_NAMESPACE = "datastore" + diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py index 4ca13b6..0a37f49 100644 --- a/src/olpc/datastore/datastore.py +++ b/src/olpc/datastore/datastore.py @@ -17,13 +17,10 @@ import dbus.service import dbus.mainloop.glib from olpc.datastore import utils +from olpc.datastore import lru +from olpc.datastore.sxattr import Xattr +from olpc.datastore.config import * -# the name used by the logger -DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore' - -DS_SERVICE = "org.laptop.sugar.DataStore" -DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore" -DS_OBJECT_PATH = "/org/laptop/sugar/DataStore" logger = logging.getLogger(DS_LOG_CHANNEL) @@ -36,6 +33,9 @@ class DataStore(dbus.service.Object): self.backends = [] self.mountpoints = {} self.root = None + + # maps uids to the mountpoint of the tip revision + self._mpcache = lru.LRU(20) # global handle to the main look dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) @@ -172,7 +172,7 @@ class DataStore(dbus.service.Object): def _resolveMountpoint(self, mountpoint=None): if isinstance(mountpoint, dict): mountpoint = mountpoint.pop('mountpoint', None) - + if mountpoint is not None: # this should be the id of a mount point mp = self.mountpoints[mountpoint] @@ -181,6 +181,61 @@ class DataStore(dbus.service.Object): mp = self.root return mp + + def _mountpointFor(self, uid): + # XXX: this is flawed in that we really need to resolve merge + # cases where objects exist in branches over two or more + # stores + # (and this have the same rev for different heads) + + # first, is it in the LRU cache? + if uid in self._mpcache: + return self._mpcache[uid] + + # attempt to resolve (and cache the mount point) + # this is the start of some merge code + on = [] + for mp in self.mountpoints.itervalues(): + try: + if "versions" in mp.capabilities: + c = mp.get(uid, allow_many=True) + if c: on.append((mp, c)) + else: + c = mp.get(uid) + if c: on.append((mp, c)) + except KeyError: + pass + + if on: + # find the single most recent revision + def versionCmp(x, y): + mpa, ca = x # mp, content + mpb, cb = y + # first by revision + r = cmp(int(ca.get_property('vid')), + int(cb.get_property('vid'))) + if r != 0: return r + # if they have the same revision we've detected a + # branch + # we should resolve the common parent in a merge case, + # etc. + # XXX: here we defer to time + return cmp(ca.get_property('mtime', 0), cb.get_property('mtime', 0)) + + + if len(on) > 1: + on.sort(versionCmp) + # the first mount point should be the most recent + # revision + mp = on[0][0] + else: + # No store has this uid. Doesn't mean it doesn't exist, + # just that its not mounted + mp = None + + self._mpcache[uid] = mp + return mp + # PUBLIC API @dbus.service.method(DS_DBUS_INTERFACE, in_signature='a{sv}s', @@ -192,11 +247,27 @@ class DataStore(dbus.service.Object): This method returns the uid and version id tuple. """ - mp = self._resolveMountpoint(props) + mp = None + # dbus cleaning + props = utils._convert(props) + + if filelike is not None: + filelike = str(filelike) + if filelike: + # attempt to scan the xattr namespace for information that can + # allow us to process this request more quickly + x = Xattr(filelike, XATTR_NAMESPACE) + known = x.asDict() + if "mountpoint" not in props and "mountpoint" in known: + mp = self._resolveMountpoint(known['mountpoint']) + + if not mp: + mp = self._resolveMountpoint(props) + if "versions" not in mp.capabilities: - uid = props.get('uid') vid = "1" # we don't care about vid on # non-versioning stores + uid = props.get('uid') if uid: # this is an update operation # and uid refers to the single object @@ -208,8 +279,9 @@ class DataStore(dbus.service.Object): else: # use the unified checkin on the backingstore uid, vid = mp.checkin(props, filelike) - - return uid, vid + + self._mpcache[uid] = mp + return uid, str(vid) @dbus.service.method(DS_DBUS_INTERFACE, @@ -223,7 +295,18 @@ class DataStore(dbus.service.Object): histories of the same object. """ ## XXX: review this when repo merge code is in place - mp = self._resolveMountpoint(mountpoint) + + # dbus cleanup + uid = str(uid) + vid = vid and str(vid) or None + mountpoint = mountpoint and str(mountpoint) or None + target = target and str(target) or None + dir = dir and str(dir) or None + + mp = self._mountpointFor(uid) + if not mp: + raise KeyError("Object with %s uid not found in datastore" % uid) + if "versions" not in mp.capabilities: content = mp.get(uid) props = content.properties @@ -310,7 +393,7 @@ class DataStore(dbus.service.Object): for hit in res: existing = d.get(hit.id) if not existing or \ - existing.get_property('mtime') < hit.get_property('mtime'): + existing.get_property('vid') < hit.get_property('vid'): # XXX: age/version check d[hit.id] = hit @@ -434,11 +517,11 @@ class DataStore(dbus.service.Object): return (d, len(results)) - def get(self, uid, mountpoint=None): + def get(self, uid, rev=None, mountpoint=None): mp = self._resolveMountpoint(mountpoint) c = None try: - c = mp.get(uid) + c = mp.get(uid, rev) if c: return c except KeyError: pass @@ -446,27 +529,36 @@ class DataStore(dbus.service.Object): if not c: for mp in self.mountpoints.itervalues(): try: - c = mp.get(uid) + if "versions" in mp.capabilities: + c = mp.get(uid, rev) + else: + c = mp.get(uid) + if c: break except KeyError: continue return c @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', + in_signature='sss', out_signature='s') - def get_filename(self, uid): - content = self.get(uid) + def get_filename(self, uid, vid=None, mountpoint=None): + vid = vid and str(vid) or None + mountpoint = mountpoint and str(mountpoint) or None + content = self.get(uid, vid, mountpoint) if content: try: return content.filename except AttributeError: pass return '' @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', + in_signature='sss', out_signature='a{sv}') - def get_properties(self, uid): - content = self.get(uid) + def get_properties(self, uid, vid=None, mountpoint=None): + vid = vid and str(vid) or None + mountpoint = mountpoint and str(mountpoint) or None + + content = self.get(uid, vid, mountpoint) props = content.properties props['mountpoint'] = content.backingstore.id return props @@ -510,7 +602,7 @@ class DataStore(dbus.service.Object): in_signature='ss', out_signature='') def delete(self, uid, mountpoint=None): - content = self.get(uid, mountpoint) + content = self.get(uid, mountpoint=mountpoint) if content: content.backingstore.delete(uid) self.Deleted(uid) @@ -520,8 +612,13 @@ class DataStore(dbus.service.Object): @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") def Deleted(self, uid): pass + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='', + out_signature='') def stop(self): - """shutdown the service""" + """shutdown the service. this is intended only for automated + testing or system shutdown.""" self.Stopped() self._connection.get_connection()._unregister_object_path(DS_OBJECT_PATH) for mp in self.mountpoints.values(): mp.stop() diff --git a/src/olpc/datastore/hg_backingstore.py b/src/olpc/datastore/hg_backingstore.py index 831da6c..7a4d1ba 100644 --- a/src/olpc/datastore/hg_backingstore.py +++ b/src/olpc/datastore/hg_backingstore.py @@ -2,14 +2,11 @@ from olpc.datastore.backingstore import FileBackingStore from olpc.datastore.sxattr import Xattr from olpc.datastore.utils import create_uid from olpc.datastore.bin_copy import bin_copy +from olpc.datastore.config import XATTR_NAMESPACE from mercurial import repo, filelog, transaction, util, revlog import os, sys, tempfile -# Xattr attribute namespace -NAMESPACE = "datastore" - - class localizedFilelog(filelog.filelog): def __init__(self, opener, path, cwd=None): @@ -34,7 +31,6 @@ class FileRepo(repo.repository): self.datadir = os.path.join(self.basepath, ".ds") if not os.path.exists(self.basepath): os.makedirs(self.basepath) - #os.chdir(self.basepath) def file(self, path): eopen = util.encodedopener(util.opener(self.datadir), util.encodefilename) @@ -99,13 +95,13 @@ class FileRepo(repo.repository): # most likely does) f = self.file(path) data = open(source, 'r').read() - tip = f.tip() if not parent: - x = Xattr(source, NAMESPACE) + x = Xattr(source, XATTR_NAMESPACE) # attempt to resolve the revision number from the property parent = x.get('revision') expected_uid = x.get('uid') + if parent: parent = int(parent) # from unicode else: @@ -118,9 +114,14 @@ class FileRepo(repo.repository): if isinstance(parent, int): # its a revision number, resolve it to a node id - parent = f.node(parent) - - if not f.cmp(parent, data): + try: + parent = f.node(parent) + except IndexError: + # XXX: log a warning, the parent passed in is + # invalid + # or... could have been archived I suppose + parent = None + if parent and not f.cmp(parent, data): # they are the same return @@ -163,17 +164,23 @@ class FileRepo(repo.repository): n = rev return l.read(n) - def dump(self, path, rev, target): + def dump(self, path, rev, target, mountpoint_id, changeset): """Dump the contents of a revision to the filename indicated by target""" fp = open(target, "w") fp.write(self.revision(path, rev)) fp.close() - # tag the checkout with its current revision - # this is used to aid in parent chaining on commits - x = Xattr(target, NAMESPACE) - x['revision'] = rev - x['uid'] = path # this is from the repo where the names are uids + # tag the checkout with its current revision this is used to + # aid in parent chaining on commits this is equivalent to the + # file_rev property in the model. + # XXX: need to check for when xattr is not supported by the fs better + x = Xattr(target, XATTR_NAMESPACE) + x['revision'] = str(rev) + x['uid'] = path # this is from the repo where the names are + # uids + x['mountpoint'] = mountpoint_id # to quickly recover the mountpoint + # this came from + x['changeset'] = changeset def remove(self, path): """Hard remove the whole version history of an object""" @@ -238,7 +245,7 @@ class HgBackingStore(FileBackingStore): uid, rev = self.checkin(props, filelike) return uid - def get(self, uid, rev=None, env=None): + def get(self, uid, rev=None, env=None, allow_many=False): # we have a whole version chain, but get contracts to # return a single entry. In this case we default to 'tip' if not rev: @@ -246,7 +253,7 @@ class HgBackingStore(FileBackingStore): results, count = self.indexmanager.get_by_uid_prop(uid, rev) if count == 0: raise KeyError(uid) - elif count == 1: + elif count == 1 or allow_many: return results.next() raise ValueError("Got %d results for 'get' operation on %s" %(count, uid)) @@ -295,6 +302,7 @@ class HgBackingStore(FileBackingStore): # removed if rev is None: rev = self.repo.tip(uid) + c = self.get(uid, rev) self.indexmanager.delete(c.id) self.repo.strip(uid, rev) @@ -302,8 +310,8 @@ class HgBackingStore(FileBackingStore): def _targetFile(self, uid, target=None, ext=None, env=None): c = self.indexmanager.get(uid) rev = int(c.get_property('vid')) - rev -= 1 # adjust for 0 based counting - self.repo.dump(uid, rev, target) + #rev -= 1 # adjust for 0 based counting + self.repo.dump(uid, rev, target, self.id, c.get_property('changeset')) return open(target, 'rw') @@ -311,39 +319,58 @@ class HgBackingStore(FileBackingStore): """create or update the content object, creating a new version""" uid = props.get("uid") + c = None if uid is None: uid = create_uid() + props['vid'] = "1" else: # is there an existing object with this uid? # XXX: if there isn't it should it be an error? - r, count = self.indexmanager.get_by_uid_prop(uid, 'tip') - if count == 1: + r, count = self.indexmanager.get_by_uid_prop(uid, + props.get('vid', 'tip')) + if count: + # XXX: if count > 1 case should be handled c = r.next() # copy the value forward old_props = c.properties.copy() old_props.update(props) props = old_props - + # except vid which we increment here + props['vid'] = str(int(props['vid']) + 1) + props['uid'] = uid - if filelike: + if filelike: message = props.setdefault('message', 'initial') - parent = props.pop('parent', None) + # if a vid is passed in treat that as a parent revision + # because this is an interaction with the repo however we + # need to resolve that versions file_rev as the parent for + # 'put' + # it maybe that it didn't previously have a file at all in + # which case we must pass None + + # where c is the content lookup from before + parent = None + try: + parent = c.get_property('file_rev', None) + except: pass + rev, parent, isTip, changeset = self.repo.put(uid, filelike, parent, message, meta=dict(uid=uid)) # the create case is pretty simple # mark everything with defaults props['changeset'] = changeset - props['vid'] = str(rev) - + props['file_rev'] = str(rev) self.indexmanager.index(props, filelike) - return uid, rev + return uid, props['vid'] def checkout(self, uid, vid=None, target=None, dir=None): """checkout the object with this uid at vid (or HEAD if None). Returns (props, filename)""" # use the repo to drive the property search f = self.repo.file(uid) + exists = f.count() > 0 + if vid: vid = f.node(int(vid) -1) # base 0 counting else: @@ -352,22 +379,33 @@ class HgBackingStore(FileBackingStore): # there will only be one thing with the changeset id of this # 'f' m = f._readmeta(vid) - changeset = m['changeset'] - objs, count = self.indexmanager.search(dict(changeset=changeset)) - assert count == 1 - obj = objs.next() - - if not target: - target, ext = obj.suggestName() + changeset = m.get('changeset') + if changeset: + objs, count = self.indexmanager.search(dict(changeset=changeset)) + assert count == 1 + obj = objs.next() + elif not exists: + # There isn't any file content with this entry + objs = self.indexmanager.get_by_uid_prop(uid) + obj = objs[0].next() + + # we expect file content + if exists: if not target: - fd, fn = tempfile.mkstemp(dir=dir) - target = fn - - if not target.startswith('/'): - if dir: target = os.path.join(dir, target) - else: os.path.join('/tmp', target) - - self.repo.dump(uid, rev, target) + target, ext = obj.suggestName() + if not target: + fd, fn = tempfile.mkstemp(suffix=ext, dir=dir) + target = fn + os.close(fd) + + if not target.startswith('/'): + if not dir: dir = "/tmp" + target = os.path.join(dir, target) + + if target: + self.repo.dump(uid, rev, target, self.id, changeset) + + if not target: target = "" return obj.properties, target if __name__ == "__main__": diff --git a/src/olpc/datastore/lru.py b/src/olpc/datastore/lru.py new file mode 100644 index 0000000..6e0b6be --- /dev/null +++ b/src/olpc/datastore/lru.py @@ -0,0 +1,81 @@ +# LRU cache +# from http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/252524 + +class Node(object): + __slots__ = ['prev', 'next', 'me'] + def __init__(self, prev, me): + self.prev = prev + self.me = me + self.next = None + +class LRU: + """ + Implementation of a length-limited O(1) LRU queue. + Built for and used by PyPE: + http://pype.sourceforge.net + Copyright 2003 Josiah Carlson. + """ + def __init__(self, count, pairs=[]): + self.count = max(count, 1) + self.d = {} + self.first = None + self.last = None + for key, value in pairs: + self[key] = value + def __contains__(self, obj): + return obj in self.d + def __getitem__(self, obj): + a = self.d[obj].me + self[a[0]] = a[1] + return a[1] + def __setitem__(self, obj, val): + if obj in self.d: + del self[obj] + nobj = Node(self.last, (obj, val)) + if self.first is None: + self.first = nobj + if self.last: + self.last.next = nobj + self.last = nobj + self.d[obj] = nobj + if len(self.d) > self.count: + if self.first == self.last: + self.first = None + self.last = None + return + a = self.first + a.next.prev = None + self.first = a.next + a.next = None + del self.d[a.me[0]] + del a + def __delitem__(self, obj): + nobj = self.d[obj] + if nobj.prev: + nobj.prev.next = nobj.next + else: + self.first = nobj.next + if nobj.next: + nobj.next.prev = nobj.prev + else: + self.last = nobj.prev + del self.d[obj] + def __iter__(self): + cur = self.first + while cur != None: + cur2 = cur.next + yield cur.me[1] + cur = cur2 + def iteritems(self): + cur = self.first + while cur != None: + cur2 = cur.next + yield cur.me + cur = cur2 + def iterkeys(self): + return iter(self.d) + def itervalues(self): + for i,j in self.iteritems(): + yield j + def keys(self): + return self.d.keys() diff --git a/src/olpc/datastore/model.py b/src/olpc/datastore/model.py index f857487..1e13a13 100644 --- a/src/olpc/datastore/model.py +++ b/src/olpc/datastore/model.py @@ -13,10 +13,11 @@ __license__ = 'The GNU Public License V2+' import datetime import mimetypes import os +import re import time import warnings from olpc.datastore.utils import timeparse - +import dbus # XXX: Open issues # list properties - Contributors (a, b, c) @@ -27,6 +28,9 @@ from olpc.datastore.utils import timeparse propertyTypes = {} _marker = object() +# used in cases where we convert title to a filename +titleSanitizer = re.compile("[\W]+") + def registerPropertyType(kind, get, set, xapian_sort_type=None, defaults=None, for_xapian=None, from_xapain=None): propertyTypes[kind] = PropertyImpl(get, set, xapian_sort_type, @@ -169,7 +173,7 @@ class Model(object): # Properties we don't automatically include in properties dict -EXCLUDED_PROPERTIES = ['fulltext', 'head', 'tip', 'changeset'] +EXCLUDED_PROPERTIES = ['fulltext', 'changeset', 'file_rev'] class Content(object): """A light weight proxy around Xapian Documents from secore. @@ -229,6 +233,7 @@ class Content(object): filename = self.get_property('filename', None) ext = self.get_property('ext', '') title = self.get_property('title', '') + title = titleSanitizer.sub("_", title) if filename: # some backingstores keep the full relative path @@ -294,8 +299,15 @@ def noop(value): return value import re base64hack = re.compile("(\S{212})") -def base64enc(value): return ' '.join(base64hack.split(value.encode('base64'))) -def base64dec(value): return value.replace(' ', '').decode('base64') +def base64enc(value): + if isinstance(value, list): + # its a bytearray + value = ''.join((str(v) for v in value)) + import pdb;pdb.set_trace() + return ' '.join(base64hack.split(value.encode('base64'))) + +def base64dec(value): + return value.replace(' ', '').decode('base64') DATEFORMAT = "%Y-%m-%dT%H:%M:%S" def date2string(value): return value.replace(microsecond=0).isoformat() @@ -337,9 +349,10 @@ registerPropertyType('text', noop, noop, 'string', {'store' : True, 'collapse' : True, }) -registerPropertyType('binary', noop, noop, None, {'store' : True, - 'exact' : False, - 'sortable' : False}) +# Now the convention is to directly use DBus.ByteArray +registerPropertyType('binary', str, dbus.ByteArray, None, {'store' : True, + 'exact' : False, + 'sortable' : False}) registerPropertyType('int', str, int, 'float', {'store' : True, 'exact' : True, @@ -368,8 +381,8 @@ defaultModel = Model().addFields( ('vid', 'string'), # unique token per change to help disambiguate for merges ('changeset', 'string'), - #('head', 'int'), # is this a head revision - #('tip', 'int'), # is this the most recent head + ('file_rev', 'int'), # this is the revision of the file pointed to + # by this revision of the object ('checksum', 'string'), ('filename', 'string'), diff --git a/src/olpc/datastore/sxattr.py b/src/olpc/datastore/sxattr.py index 78a64f3..957395a 100644 --- a/src/olpc/datastore/sxattr.py +++ b/src/olpc/datastore/sxattr.py @@ -51,4 +51,13 @@ class Xattr(object): def keys(self): return list(self.iterkeys()) - + def asDict(self): + d = {} + for k in self.iterkeys(): + d[k] = self[k] + return d + + def update(self, dict): + for k,v in dict.iteritems(): + self[k] = v + diff --git a/src/olpc/datastore/utils.py b/src/olpc/datastore/utils.py index 0505463..e14461c 100644 --- a/src/olpc/datastore/utils.py +++ b/src/olpc/datastore/utils.py @@ -92,6 +92,9 @@ def _convert(arg): d[str(_convert(k))] = _convert(v) return d + if isinstance(arg, dbus.ByteArray): + return arg + if isinstance(arg, dbus.Array): a = [] for item in arg: diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py index 84b3ebd..4212ad3 100644 --- a/src/olpc/datastore/xapianindex.py +++ b/src/olpc/datastore/xapianindex.py @@ -234,6 +234,7 @@ class IndexManager(object): doc.fields.append(secore.Field('fulltext', chunk)) self.write_index.replace(doc) + if versions and not inplace: # we know the source file is ours # to remove @@ -241,7 +242,7 @@ class IndexManager(object): logger.info("update file content %s:%s" % (uid, vid)) else: - logger.warning("""Conversion process failed for document %s %s""" % (uid, filename)) + logger.debug("""Conversion process failed for document %s %s""" % (uid, filename)) else: logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation)) @@ -349,8 +350,8 @@ class IndexManager(object): if self.versions: vid = props.get("vid") if not vid: - self.warn("Didn't supply needed versioning information" - " on a backingstore which performs versioning") + logger.warn("Didn't supply needed versioning information" + " on a backingstore which performs versioning") # each versions id is unique when using a versioning store doc.id = create_uid() else: @@ -405,12 +406,12 @@ class IndexManager(object): # when rev is passed only that particular rev is returne ri = self.read_index q = ri.query_field('uid', uid) - if rev: + if rev is not None: if rev == "tip": rev = self.backingstore.tip(uid) q = ri.query_filter(q, ri.query_field('vid', str(rev))) - results, count = self._search(q, 0, 1000) + results, count = self._search(q, 0, 1000, sortby="-vid") return results, count @@ -471,8 +472,8 @@ class IndexManager(object): return self._search(q, start_index, end_index) - def _search(self, q, start_index, end_index): - results = self.read_index.search(q, start_index, end_index) + def _search(self, q, start_index, end_index, sortby=None): + results = self.read_index.search(q, start_index, end_index, sortby=sortby) count = results.matches_estimated # map the result set to model.Content items diff --git a/tests/simple_versions.txt b/tests/simple_versions.txt index df6ce9b..d35882f 100644 --- a/tests/simple_versions.txt +++ b/tests/simple_versions.txt @@ -86,10 +86,10 @@ Lets check out the head version of this document now. >>> props, rev2 = ds.checkout(uid, dir='/tmp') -Check that the id is the same and the version id isn't. +Check that the id and vid are correct. >>> assert props['uid'] == uid ->>> assert props['vid'] != vid +>>> assert props['vid'] == vid Verify the contents of the file is as expected. diff --git a/tests/test_hgrepo.py b/tests/test_hgrepo.py index c67c506..0a05732 100644 --- a/tests/test_hgrepo.py +++ b/tests/test_hgrepo.py @@ -46,8 +46,8 @@ class Test(unittest.TestCase): co1 = os.path.join(DATA_1, "co1") co2 = os.path.join(DATA_1, "co2") - repo.dump("foo", 0, co1) - repo.dump("foo", 1, co2) + repo.dump("foo", 0, co1, '', '') + repo.dump("foo", 1, co2, '', '') assert open(co1, 'r').read() == TEXT_1 assert open(co2, 'r').read() == TEXT_2 @@ -75,7 +75,8 @@ class Test(unittest.TestCase): got = open(fn, 'r').read() assert got == d - uid, rev = bs.checkin(dict(uid=uid, title="B"), tmpData(d2)) + props['title'] = "B" + uid, rev = bs.checkin(props, tmpData(d2)) bs.complete_indexing() |