1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
|
from olpc.datastore.backingstore import FileBackingStore
from olpc.datastore.sxattr import Xattr
from olpc.datastore.utils import create_uid
from olpc.datastore.bin_copy import bin_copy
from olpc.datastore.config import XATTR_NAMESPACE
from mercurial import repo, filelog, transaction, util, revlog
import os, sys, tempfile
class localizedFilelog(filelog.filelog):
def __init__(self, opener, path, cwd=None):
self._fullpath = os.path.realpath(cwd)
revlog.revlog.__init__(self, opener,
self.encodedir(path + ".i"))
@property
def rindexfile(self): return os.path.join(self._fullpath, self.indexfile)
@property
def rdatafile(self): return os.path.join(self._fullpath, self.datafile)
class FileRepo(repo.repository):
"""A very simple file repository that functions without the need
for global changesets or a working copy.
"""
def __init__(self, path):
self.basepath = path
self.root = os.path.realpath(self.basepath)
self.repo = "data"
self.datadir = os.path.join(self.basepath, ".ds")
if not os.path.exists(self.basepath):
os.makedirs(self.basepath)
def file(self, path):
eopen = util.encodedopener(util.opener(self.datadir), util.encodefilename)
fl = localizedFilelog(eopen, path, cwd=self.datadir)
return fl
def __contains__(self, path):
"""Is this path already managed by the repo?"""
f = self.file(path)
p = f.rindexfile
return os.path.exists(p)
def raw_copy(self, filenames):
# create localized repository versions of the raw data from
# another repo
# this doesn't do indexing or anything like that
for fn in filenames:
srcdir, n = os.path.split(fn)
target = self._rjoin(n)
bin_copy(fn, target)
def raw_sources(self):
# return list of filenames which must be copied
return [d for d in self._rdata() if d]
def _rjoin(self, path):
# repository path
return os.path.join(self.basepath, ".ds", path)
def _rdata(self, path):
"""return the index and datafile names in the repository"""
f = self.file(path)
base = os.path.join(self.basepath, ".ds")
i = os.path.join(base, f.rindexfile)
d = os.path.join(base, f.rdatafile)
return (i and i or None, d and d or None)
def put(self, path, source, parent=None, text=None, meta=None):
"""Create a new revision of the content indicated by
'path'. 'source' is the filename containing the data we wish
to commit. parent when present is the parent revision this
data comes from. When parent is not provided we first look at
the source file for the xattr 'user.datastore.revision', if
that is present we assume it is the parent revision of the
element in path. When absent we assume that this is a delta to
the tip.
@return rev, parent, parent is tip, changeset
- rev is this files new revision number
- parent is the id of the revision used as the parent
- parent_is_tip indicates if the parent was the most recent
head
- changeset is the random uid of this transaction used in
merges
"""
# we don't have a real working copy model. The source file may
# exist in some path or location well outside the repo (and
# most likely does)
f = self.file(path)
data = open(source, 'r').read()
tip = f.tip()
if not parent:
x = Xattr(source, XATTR_NAMESPACE)
# attempt to resolve the revision number from the property
parent = x.get('revision')
expected_uid = x.get('uid')
if parent:
parent = int(parent) # from unicode
else:
# there wasn't an attribute on the file
# this can happen if the file was created by the
# client or
# the filesystem didn't support xattr
# in this case we assume the revision is the tip
parent = tip
if isinstance(parent, int):
# its a revision number, resolve it to a node id
try:
parent = f.node(parent)
except IndexError:
# XXX: log a warning, the parent passed in is
# invalid
# or... could have been archived I suppose
parent = None
if parent and not f.cmp(parent, data):
# they are the same
return
# assume some default values for missing metadata
# changeset is important here. Files don't properly belong to
# change sets, but this uid is used to discriminate versions
# with identical revision numbers from different sources
changeset = create_uid()
if not meta: meta = {}
meta.setdefault('text', text and text or "automatic commit")
meta['changeset'] = changeset
# commit the data to the log
t = self.transaction()
rev = f.count() + 1
f.add(data, meta, t, rev, parent)
t.close()
return rev, parent, parent == tip, changeset
def transaction(self):
return transaction.transaction(sys.stderr.write, open, "journal")
def tip(self, path):
# return the revision id that is the tip for a given path
l = self.file(path)
return l.rev(l.tip())
def revision(self, path, rev):
"""Given the path name return the data associated with the raw
revision"""
# we really need a less memory intensive way of doing this
# stream the data to stable media as it processes the delta
if path not in self: raise KeyError("%s is not managed by repo" % path)
l = self.file(path)
if isinstance(rev, int):
n = l.node(rev)
else:
n = rev
return l.read(n)
def dump(self, path, rev, target, mountpoint_id, changeset):
"""Dump the contents of a revision to the filename indicated
by target"""
fp = open(target, "w")
fp.write(self.revision(path, rev))
fp.close()
# tag the checkout with its current revision this is used to
# aid in parent chaining on commits this is equivalent to the
# file_rev property in the model.
# XXX: need to check for when xattr is not supported by the fs better
x = Xattr(target, XATTR_NAMESPACE)
x['revision'] = str(rev)
x['uid'] = path # this is from the repo where the names are
# uids
x['mountpoint'] = mountpoint_id # to quickly recover the mountpoint
# this came from
x['changeset'] = changeset
def remove(self, path):
"""Hard remove the whole version history of an object"""
i, d = self._rdata(path)
if i and os.path.exists(i):
os.unlink(i)
# for small files d will not exist as the data is inlined to
# the the index
if d and os.path.exists(d):
os.unlink(d)
def strip(self, path, rev):
"""attempt to remove a given revision from the history of
path"""
f = self.file(path)
f.strip(rev, rev)
class HgBackingStore(FileBackingStore):
"""This backingstore for the datastore supports versioning by
keeping a barebones Mercurial repository under the hood
"""
capabilities = ("file", "versions")
def __init__(self, uri, **kwargs):
# the internal handle to the HgRepo
self.repo = None
uri = uri[len('hg:'):]
super(HgBackingStore, self).__init__(uri, **kwargs)
@staticmethod
def parse(uri):
return uri.startswith("hg:")
def check(self):
if not os.path.exists(self.uri): return False
if not os.path.exists(self.base): return False
return True
def initialize(self):
super(FileBackingStore, self).initialize()
self.repo = FileRepo(self.base)
def load(self):
super(HgBackingStore, self).load()
# connect the repo
if not self.repo:
self.repo = FileRepo(self.base)
def tip(self, uid):
return self.repo.tip(uid)
# File Management API
def create(self, props, filelike):
# generate the uid ourselves. we do this so we can track the
# uid and changeset info
# Add it to the index
uid = create_uid()
props['uid'] = uid
props.setdefault('message', 'initial')
uid, rev = self.checkin(props, filelike)
return uid
def get(self, uid, rev=None, env=None, allow_many=False):
# we have a whole version chain, but get contracts to
# return a single entry. In this case we default to 'tip'
if not rev:
rev = self.repo.tip(uid)
results, count = self.indexmanager.get_by_uid_prop(uid, rev)
if count == 0:
raise KeyError(uid)
elif count == 1 or allow_many:
return results.next()
raise ValueError("Got %d results for 'get' operation on %s" %(count, uid))
######################################################################
# XXX: This whole policy is botched unless the journal grows an
# # interface to display other versions of the main document
# # which it doesn't have. If I do this then we don't see those
# # versions in the journal and have no means to access
# # them. For the time being we just index everything and let
# # date sort them out.
######################################################################
# # recover the old records for this uid
# # update may rewrite/remove 1-n documents
# # depending on the number of heads and so on
# # this needs to be done under a single transaction
# # actions will be a set of commands passed to
# # xapianindex.enque
# # the loop will process the entire action set at once
#
# # we need to pass head/tip tags from the parent to the child
# # as a result of the update
# # XXX: currently we are only going to index the HEADs and TIP
# # revisions in the repository (and those marked with KEEP).
# # This means that when we update
# # with these tags:
# # we can remove the old version from xapian
# # w/o these tags:
# # it gets a head tag, implying a branch
# #
# # because the keep flag indicates content is needed to be kept
# # locally we have two real choices, either
# # move it forward with each revision
# # keep only the original tagged version
# # and index the new one as well (because it will have the
# # HEAD tag)
##########################################################################
def update(self, uid, props, filelike):
props['uid'] = uid
uid, rev = self.checkin(props, filelike)
return uid
def delete(self, uid, rev=None):
# delete the object at 'uid', when no rev is passed tip is
# removed
if rev is None:
rev = self.repo.tip(uid)
c = self.get(uid, rev)
self.indexmanager.delete(c.id)
self.repo.strip(uid, rev)
def _targetFile(self, uid, target=None, ext=None, env=None):
c = self.indexmanager.get(uid)
rev = int(c.get_property('vid'))
#rev -= 1 # adjust for 0 based counting
self.repo.dump(uid, rev, target, self.id, c.get_property('changeset'))
return open(target, 'rw')
def checkin(self, props, filelike):
"""create or update the content object, creating a new
version"""
uid = props.get("uid")
c = None
if uid is None:
uid = create_uid()
props['vid'] = "1"
else:
# is there an existing object with this uid?
# XXX: if there isn't it should it be an error?
r, count = self.indexmanager.get_by_uid_prop(uid,
props.get('vid', 'tip'))
if count:
# XXX: if count > 1 case should be handled
c = r.next()
# copy the value forward
old_props = c.properties.copy()
old_props.update(props)
props = old_props
# except vid which we increment here
props['vid'] = str(int(props['vid']) + 1)
props['uid'] = uid
if filelike:
message = props.setdefault('message', 'initial')
# if a vid is passed in treat that as a parent revision
# because this is an interaction with the repo however we
# need to resolve that versions file_rev as the parent for
# 'put'
# it maybe that it didn't previously have a file at all in
# which case we must pass None
# where c is the content lookup from before
parent = None
try:
parent = c.get_property('file_rev', None)
except: pass
rev, parent, isTip, changeset = self.repo.put(uid, filelike,
parent, message,
meta=dict(uid=uid))
# the create case is pretty simple
# mark everything with defaults
props['changeset'] = changeset
props['file_rev'] = str(rev)
self.indexmanager.index(props, filelike)
return uid, props['vid']
def checkout(self, uid, vid=None, target=None, dir=None):
"""checkout the object with this uid at vid (or HEAD if
None). Returns (props, filename)"""
# use the repo to drive the property search
f = self.repo.file(uid)
exists = f.count() > 0
if vid:
vid = f.node(int(vid) -1) # base 0 counting
else:
vid = f.tip()
rev = f.rev(vid)
# there will only be one thing with the changeset id of this
# 'f'
m = f._readmeta(vid)
changeset = m.get('changeset')
if changeset:
objs, count = self.indexmanager.search(dict(changeset=changeset))
assert count == 1
obj = objs.next()
elif not exists:
# There isn't any file content with this entry
objs = self.indexmanager.get_by_uid_prop(uid)
obj = objs[0].next()
# we expect file content
if exists:
if not target:
target, ext = obj.suggestName()
if not target:
fd, fn = tempfile.mkstemp(suffix=ext, dir=dir)
target = fn
os.close(fd)
if not target.startswith('/'):
if not dir: dir = "/tmp"
target = os.path.join(dir, target)
if target:
self.repo.dump(uid, rev, target, self.id, changeset)
if not target: target = ""
return obj.properties, target
if __name__ == "__main__":
import rlcompleter2
rlcompleter2.setup(verbose=0)
TESTLOC = "/tmp/fltest"
os.system('rm -rf %s' % TESTLOC)
c = FileRepo(TESTLOC)
n = c.blit("foo", "this is a test")
m = c.blit("bar", "another test")
o = c.blit("foo", "\nanother line", mode="a")
c.revisions("foo")
|