1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
|
"""
olpc.datastore.backingstore
~~~~~~~~~~~~~~~~~~~~~~~~~~~
management of stable storage for the datastore
"""
__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>'
__docformat__ = 'restructuredtext'
__copyright__ = 'Copyright ObjectRealms, LLC, 2007'
__license__ = 'The GNU Public License V2+'
import cPickle as pickle
from datetime import datetime
import gnomevfs
import os
import re
import sha
import subprocess
import time
import threading
import errno
import shutil
import urllib
import traceback
import sys
import dbus
import xapian
import gobject
try:
import cjson
has_cjson = True
except ImportError:
import simplejson
has_cjson = False
from olpc.datastore.xapianindex import IndexManager
from olpc.datastore import bin_copy
from olpc.datastore import utils
from olpc.datastore import model
# changing this pattern impacts _targetFile
filename_attempt_pattern = re.compile('\(\d+\)$')
import logging
DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore'
logger = logging.getLogger(DS_LOG_CHANNEL)
#logger.setLevel(logging.DEBUG)
class BackingStore(object):
"""Backing stores manage stable storage. We abstract out the
management of file/blob storage through this class, as well as the
location of the backing store itself (it may be done via a network
connection for example).
While the backingstore is responsible for implementing the
metadata interface no implementation is provided here. It is
assumed by that interface that all the features enumerated in
olpc.datastore.model are provided.
"""
def __init__(self, uri, **kwargs):
"""The kwargs are used to configure the backend so it can
provide its interface. See specific backends for details
"""
pass
def __repr__(self):
return "<%s %s: %s %s>" % (self.__class__.__name__, self.id,
self.title, self.uri)
# Init phases
@staticmethod
def parse(uri):
"""parse the uri into an actionable mount-point.
Returns True or False indicating if this backend handles a
given uri.
"""
return False
def initialize_and_load(self):
"""phase to check the state of the located mount point, this
method returns True (mount point is valid) or False (invalid
or uninitialized mount point).
self.check() which must return a boolean should check if the
result of self.locate() is already a datastore and then
initialize/load it according to self.options.
When True self.load() is invoked.
When False self.create() followed by self.load() is invoked.
"""
if self.check() is False:
self.initialize()
self.load()
def check(self):
return False
def load(self):
"""load the index for a given mount-point, then initialize its
fulltext subsystem. This is the routine that will bootstrap
the indexmanager (though create() may have just created it)
"""
pass
def initialize(self):
"""Initialize a new mount point"""
pass
# Informational
def descriptor(self):
"""return a dict with atleast the following keys
'id' -- the id used to refer explicitly to the mount point
'title' -- Human readable identifier for the mountpoint
'uri' -- The uri which triggered the mount
"""
pass
@property
def id(self): return self.descriptor()['id']
@property
def title(self): return self.descriptor()['title']
# Storage Translation
def localizedName(self, uid=None, content=None, target=None):
"""taking any of uid, a content object, or a direct target
filename (which includes all of the relative components under a
store). Return the localized filename that should be used _within_
the repository for the storage of this content object
"""
pass
import time
class AsyncCopy:
CHUNK_SIZE=65536
def __init__(self, src, dest, completion):
self.src = src
self.dest = dest
self.completion = completion
self.src_fp = -1
self.dest_fp = -1
self.written = 0
self.size = 0
def _cleanup(self):
os.close(self.src_fp)
os.close(self.dest_fp)
def _copy_block(self, user_data=None):
try:
data = os.read(self.src_fp, AsyncCopy.CHUNK_SIZE)
count = os.write(self.dest_fp, data)
self.written += len(data)
# error writing data to file?
if count < len(data):
logger.debug("AC: Error writing %s -> %s: wrote less than expected" % (self.src, self.dest))
self._cleanup()
self.completion(RuntimeError("Error writing data to destination file"))
return False
# FIXME: emit progress here
# done?
if len(data) < AsyncCopy.CHUNK_SIZE:
logger.debug("AC: Copied %s -> %s (%d bytes, %ds)" % (self.src, self.dest, self.written, time.time() - self.tstart))
self._cleanup()
self.completion(None, self.dest)
return False
except Exception, err:
logger.debug("AC: Error copying %s -> %s: %r" % (self.src, self.dest, err))
self._cleanup()
self.completion(err)
return False
return True
def start(self):
self.src_fp = os.open(self.src, os.O_RDONLY)
self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT, 0644)
stat = os.fstat(self.src_fp)
self.size = stat[6]
logger.debug("AC: will copy %s -> %s (%d bytes)" % (self.src, self.dest, self.size))
self.tstart = time.time()
sid = gobject.idle_add(self._copy_block)
class FileBackingStore(BackingStore):
""" A backing store that directs maps the storage of content
objects to an available filesystem.
# not really true, the change would result in the older version
having the last content and the new version as well. The old one
is left in the newest start rather than start state. if that can work...
The semantics of interacting
with the repositories mean that the current copy _can_ be edited
in place. Its the actions create/update that create new revisions
in the datastore and hence new versions.
"""
STORE_NAME = "store"
INDEX_NAME = "index"
DESCRIPTOR_NAME = "metainfo"
def __init__(self, uri, **kwargs):
""" FileSystemStore(path=<root of managed storage>)
"""
self.options = kwargs
self.local_indexmanager = self.options.get('local_indexmanager', True)
self.uri = uri
self.base = os.path.join(uri, self.STORE_NAME)
self.indexmanager = None
""" Current uid of the user that is calling DataStore.get_filename
through dbus. Needed for security stuff. It is an instance variable
instead of a method parameter because this is less invasive for Update 1.
"""
self.current_user_id = None
# source for an idle callback that exports to the file system the
# metadata from the index
self._export_metadata_source = None
# Informational
def descriptor(self):
"""return a dict with atleast the following keys
'id' -- the id used to refer explicitly to the mount point
'title' -- Human readable identifier for the mountpoint
'uri' -- The uri which triggered the mount
"""
# a hidden file with a pickled dict will live in the base
# directory for each storage
fn = os.path.join(self.base, self.DESCRIPTOR_NAME)
desc = None
if os.path.exists(fn):
try:
fp = open(fn, 'r')
desc = pickle.load(fp)
fp.close()
except:
desc = None
if not desc:
# the data isn't there, this could happen for a number of
# reasons (the store isn't writeable)
# or if the information on it was corrupt
# in this case, just create a new one
desc = {'id' : self.uri,
'uri' : self.uri,
'title' : self.uri
}
self.create_descriptor(**desc)
return desc
def create_descriptor(self, **kwargs):
# create the information descriptor for this store
# defaults will be created if need be
# passing limited values will leave existing keys in place
kwargs = utils._convert(kwargs)
fn = os.path.join(self.base, self.DESCRIPTOR_NAME)
desc = {}
if os.path.exists(fn):
fp = open(fn, 'r')
try:
desc = pickle.load(fp)
except:
desc = {}
finally:
fp.close()
desc.update(kwargs)
if 'id' not in desc: desc['id'] = utils.create_uid()
if 'uri' not in desc: desc['uri'] = self.uri
if 'title' not in desc: desc['title'] = self.uri
# TODO: Would be better to check if the device is present and
# don't try to update the descriptor file if it's not.
try:
fp = open(fn, 'w')
pickle.dump(desc, fp)
fp.close()
except IOError, e:
logging.error('Unable to write descriptor:\n' + \
''.join(traceback.format_exception(*sys.exc_info())))
@staticmethod
def parse(uri):
return os.path.isabs(uri) or os.path.isdir(uri)
def check(self):
if not os.path.exists(self.uri): return False
if not os.path.exists(self.base): return False
return True
def initialize(self):
if not os.path.exists(self.base):
os.makedirs(self.base)
# examine options and see what the indexmanager plan is
if self.local_indexmanager:
# create a local storage using the indexmanager
# otherwise we will connect the global manager
# in load
index_name = os.path.join(self.base, self.INDEX_NAME)
options = utils.options_for(self.options, 'indexmanager.')
im = IndexManager()
# This will ensure the fulltext and so on are all assigned
im.bind_to(self)
im.connect(index_name, **options)
self.create_descriptor(**options)
self.indexmanager = im
def load(self):
if not self.indexmanager and self.local_indexmanager:
# create a local storage using the indexmanager
# otherwise we will connect the global manager
# in load
index_name = os.path.join(self.base, self.INDEX_NAME)
options = utils.options_for(self.options, 'indexmanager.')
im = IndexManager()
desc = utils.options_for(self.options,
'indexmanager.',
invert=True)
if desc: self.create_descriptor(**desc)
# This will ensure the fulltext and so on are all assigned
im.bind_to(self)
im.connect(index_name)
self.indexmanager = im
# Check that all entries have their metadata in the file system.
if not os.path.exists(os.path.join(self.base, '.metadata.exported')):
uids_to_export = []
uids = self.indexmanager.get_all_ids()
for uid in uids:
if not os.path.exists(os.path.join(self.base, uid + '.metadata')):
uids_to_export.append(uid)
if uids_to_export:
self._export_metadata_source = gobject.idle_add(
self._export_metadata, uids_to_export)
else:
open(os.path.join(self.base, '.metadata.exported'), 'w').close()
def _export_metadata(self, uids_to_export):
uid = uids_to_export.pop()
props = self.indexmanager.get(uid).properties
self._store_metadata(uid, props)
return len(uids_to_export) > 0
def bind_to(self, datastore):
## signal from datastore that we are being bound to it
self.datastore = datastore
def localizedName(self, uid=None, content=None, target=None):
"""taking any of uid, a content object, or a direct target
filename (which includes all of the relative components under a
store). Return the localized filename that should be used _within_
the repository for the storage of this content object
"""
if target: return os.path.join(self.base, target)
elif content:
# see if it expects a filename
fn, ext = content.suggestName()
if fn: return os.path.join(self.base, fn)
if ext: return os.path.join(self.base, "%s.%s" %
(content.id, ext))
if not uid: uid = content.id
if uid:
return os.path.join(self.base, uid)
else:
raise ValueError("""Nothing submitted to generate internal
storage name from""")
def _translatePath(self, uid):
"""translate a UID to a path name"""
# paths into the datastore
return os.path.join(self.base, str(uid))
def _targetFile(self, uid, target=None, ext=None, env=None):
logging.debug('FileBackingStore._targetFile: %r %r %r %r' % (uid, target, ext, env))
# paths out of the datastore, working copy targets
path = self._translatePath(uid)
if not os.path.exists(path):
return None
if target: targetpath = target
else:
targetpath = uid.replace('/', '_').replace('.', '__')
if ext:
if not ext.startswith('.'): ext = ".%s" % ext
targetpath = "%s%s" % (targetpath, ext)
use_instance_dir = os.path.exists('/etc/olpc-security') and \
os.getuid() != self.current_user_id
if use_instance_dir:
if not self.current_user_id:
raise ValueError("Couldn't determine the current user uid.")
base = os.path.join(os.environ['HOME'], 'isolation', '1', 'uid_to_instance_dir',
str(self.current_user_id))
else:
profile = os.environ.get('SUGAR_PROFILE', 'default')
base = os.path.join(os.path.expanduser('~'), '.sugar', profile, 'data')
if not os.path.exists(base):
os.makedirs(base)
targetpath = os.path.join(base, targetpath)
attempt = 0
while os.path.exists(targetpath):
# here we look for a non-colliding name
# this is potentially a race and so we abort after a few
# attempts
targetpath, ext = os.path.splitext(targetpath)
if filename_attempt_pattern.search(targetpath):
targetpath = filename_attempt_pattern.sub('', targetpath)
attempt += 1
if attempt > 9:
targetpath = "%s(%s).%s" % (targetpath, time.time(), ext)
break
targetpath = "%s(%s)%s" % (targetpath, attempt, ext)
# Try to make the original file readable. This can fail if the file is
# in FAT filesystem.
try:
os.chmod(path, 0604)
except OSError, e:
if e.errno != errno.EPERM:
raise
# Try to hard link from the original file to the targetpath. This can
# fail if the file is in a different filesystem. Do a symlink instead.
try:
os.link(path, targetpath)
except OSError, e:
if e.errno == errno.EXDEV:
os.symlink(path, targetpath)
else:
raise
return open(targetpath, 'r')
def _mapContent(self, uid, fp, path, env=None):
"""map a content object and the file in the repository to a
working copy.
"""
# env would contain things like cwd if we wanted to map to a
# known space
content = self.indexmanager.get(uid)
# we need to map a copy of the content from the backingstore into the
# activities addressable space.
# map this to a rw file
if fp:
target, ext = content.suggestName()
targetfile = self._targetFile(uid, target, ext, env)
content.file = targetfile
if self.options.get('verify', False):
c = sha.sha()
for line in targetfile:
c.update(line)
fp.seek(0)
if c.hexdigest() != content.checksum:
raise ValueError("Content for %s corrupt" % uid)
return content
def _writeContent_complete(self, path, completion=None):
self._set_permissions_if_possible(path)
if completion is None:
return path
completion(None, path)
return None
def _set_permissions_if_possible(self, path):
try:
os.chmod(path, 0604)
except OSError, e:
# This can fail for usb sticks.
if e.errno != errno.EPERM:
raise
def _writeContent(self, uid, filelike, replace=True, can_move=False, target=None,
completion=None):
"""Returns: path of file in datastore (new path if it was copied/moved)"""
content = None
if target: path = target
else:
path = self._translatePath(uid)
if replace is False and os.path.exists(path):
raise KeyError("objects with path:%s for uid:%s exists" %(
path, uid))
if filelike.name != path:
# protection on inplace stores
if completion is None:
bin_copy.bin_copy(filelike.name, path)
self._set_permissions_if_possible(path)
return path
if can_move:
bin_copy.bin_mv(filelike.name, path)
self._set_permissions_if_possible(path)
return self._writeContent_complete(path, completion)
# Otherwise, async copy
aco = AsyncCopy(filelike.name, path, completion)
aco.start()
else:
return self._writeContent_complete(path, completion)
def _checksum(self, filename):
c = sha.sha()
fp = open(filename, 'r')
for line in fp:
c.update(line)
fp.close()
return c.hexdigest()
# File Management API
def _encode_json(self, metadata, file_path):
if has_cjson:
f = open(file_path, 'w')
f.write(cjson.encode(metadata))
f.close()
else:
simplejson.dump(metadata, open(file_path, 'w'))
def _store_metadata(self, uid, props):
t = time.time()
temp_path = os.path.join(self.base, '.temp_metadata')
props = props.copy()
for property_name in model.defaultModel.get_external_properties():
if property_name in props:
del props[property_name]
self._encode_json(props, temp_path)
path = os.path.join(self.base, uid + '.metadata')
os.rename(temp_path, path)
logging.debug('exported metadata: %r s.' % (time.time() - t))
def _delete_metadata(self, uid):
path = os.path.join(self.base, uid + '.metadata')
if os.path.exists(path):
os.unlink(path)
def _create_completion(self, uid, props, completion, exc=None, path=None):
if exc:
completion(exc)
return
try:
# Index the content this time
self.indexmanager.index(props, path)
completion(None, uid)
except Exception, exc:
completion(exc)
def create_async(self, props, filelike, can_move=False, completion=None):
if completion is None:
raise RuntimeError("Completion must be valid for async create")
uid = self.indexmanager.index(props)
self._store_metadata(uid, props)
props['uid'] = uid
if filelike:
if isinstance(filelike, basestring):
# lets treat it as a filename
filelike = open(filelike, "r")
filelike.seek(0)
self._writeContent(uid, filelike, replace=False, can_move=can_move,
completion=lambda *args: self._create_completion(uid, props, completion, *args))
else:
completion(None, uid)
def create(self, props, filelike, can_move=False):
if filelike:
uid = self.indexmanager.index(props)
self._store_metadata(uid, props)
props['uid'] = uid
if isinstance(filelike, basestring):
# lets treat it as a filename
filelike = open(filelike, "r")
filelike.seek(0)
path = self._writeContent(uid, filelike, replace=False, can_move=can_move)
self.indexmanager.index(props, path)
return uid
else:
uid = self.indexmanager.index(props)
self._store_metadata(uid, props)
return uid
def get(self, uid, env=None, allowMissing=False, includeFile=False):
content = self.indexmanager.get(uid)
if not content: raise KeyError(uid)
path = self._translatePath(uid)
fp = None
# not all content objects have a file
if includeFile and os.path.exists(path):
fp = open(path, 'r')
# now return a Content object from the model associated with
# this file object
content = self._mapContent(uid, fp, path, env)
if fp:
fp.close()
return content
def _update_completion(self, uid, props, completion, exc=None, path=None):
if exc is not None:
completion(exc)
return
try:
self.indexmanager.index(props, path)
completion()
except Exception, exc:
completion(exc)
def update_async(self, uid, props, filelike, can_move=False, completion=None):
logging.debug('backingstore.update_async')
if filelike is None:
raise RuntimeError("Filelike must be valid for async update")
if completion is None:
raise RuntimeError("Completion must be valid for async update")
props['uid'] = uid
self._store_metadata(uid, props)
if filelike:
uid = self.indexmanager.index(props, filelike)
props['uid'] = uid
if isinstance(filelike, basestring):
# lets treat it as a filename
filelike = open(filelike, "r")
filelike.seek(0)
self._writeContent(uid, filelike, can_move=can_move,
completion=lambda *args: self._update_completion(uid, props, completion, *args))
else:
self.indexmanager.index(props)
completion()
def update(self, uid, props, filelike=None, can_move=False):
props['uid'] = uid
self._store_metadata(uid, props)
if filelike:
if isinstance(filelike, basestring):
# lets treat it as a filename
filelike = open(filelike, "r")
filelike.seek(0)
path = self._writeContent(uid, filelike, can_move=can_move)
self.indexmanager.index(props, path)
else:
self.indexmanager.index(props)
def _delete_external_properties(self, uid):
external_properties = model.defaultModel.get_external_properties()
for property_name in external_properties:
file_path = os.path.join(self.base, property_name, uid)
if os.path.exists(file_path):
logging.debug('deleting external property: %r' % file_path)
os.unlink(file_path)
def delete(self, uid, allowMissing=True):
self._delete_external_properties(uid)
self._delete_metadata(uid)
self.indexmanager.delete(uid)
path = self._translatePath(uid)
if os.path.exists(path):
os.unlink(path)
else:
if not allowMissing:
raise KeyError("object for uid:%s missing" % uid)
def get_uniquevaluesfor(self, propertyname):
return self.indexmanager.get_uniquevaluesfor(propertyname)
def get_external_property(self, doc_id, key):
# external properties default to the following storage
# <repo>/key/uid which is the file containing the external
# data. its contents is returned by this call
# when missing or absent '' is returned
pfile = os.path.join(self.base, key, str(doc_id))
if os.path.exists(pfile): v = open(pfile, 'r').read()
else: v = ''
return dbus.ByteArray(v)
def set_external_property(self, doc_id, key, value):
pdir = os.path.join(self.base, key)
if not os.path.exists(pdir): os.mkdir(pdir)
pfile = os.path.join(pdir, doc_id)
fp = open(pfile, 'w')
fp.write(value)
fp.close()
def find(self, query, order_by=None, limit=None, offset=0):
if not limit: limit = 4069
return self.indexmanager.search(query, start_index=offset, end_index=limit, order_by=order_by)
def ids(self):
return self.indexmanager.get_all_ids()
def stop(self):
if self._export_metadata_source is not None:
gobject.source_remove(self._export_metadata_source)
self.indexmanager.stop()
def complete_indexing(self):
self.indexmanager.complete_indexing()
class InplaceFileBackingStore(FileBackingStore):
"""Like the normal FileBackingStore this Backingstore manages the
storage of files, but doesn't move files into a repository. There
are no working copies. It simply adds index data through its
indexmanager and provides fulltext ontop of a regular
filesystem. It does record its metadata relative to this mount
point.
This is intended for USB keys and related types of attachable
storage.
"""
STORE_NAME = ".olpc.store"
def __init__(self, uri, **kwargs):
# remove the 'inplace:' scheme
uri = uri[len('inplace:'):]
super(InplaceFileBackingStore, self).__init__(uri, **kwargs)
# use the original uri
self.uri = uri
self._walk_source = None
@staticmethod
def parse(uri):
return uri.startswith("inplace:")
def check(self):
if not os.path.exists(self.uri): return False
if not os.path.exists(self.base): return False
return True
def load(self):
try:
super(InplaceFileBackingStore, self).load()
except xapian.DatabaseCorruptError, e:
# TODO: Try to recover in a smarter way than deleting the base
# dir and reinitializing the index.
logging.error('Error while trying to load mount point %s: %s. ' \
'Will try to renitialize and load again.' % (self.base, e))
# Delete the base dir and its contents
for root, dirs, files in os.walk(self.base, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
os.rmdir(root)
self.initialize()
self.load()
return
# now map/update the existing data into the indexes
# but do it async
files_to_check = []
for dirpath, dirname, filenames in os.walk(self.uri):
if self.base in dirpath: continue
if self.STORE_NAME in dirname:
dirname.remove(self.STORE_NAME)
# blacklist all the hidden directories
if '/.' in dirpath: continue
for fn in filenames:
# ignore conventionally hidden files
if fn.startswith("."):
continue
files_to_check.append((dirpath, fn))
self._walk_source = gobject.idle_add(self._walk, files_to_check)
def _walk(self, files_to_check):
dirpath, fn = files_to_check.pop()
logging.debug('InplaceFileBackingStore._walk(): %r' % fn)
try:
source = os.path.join(dirpath, fn)
relative = source[len(self.uri)+1:]
result, count = self.indexmanager.search(dict(filename=relative))
mime_type = gnomevfs.get_mime_type(source)
stat = os.stat(source)
ctime = datetime.fromtimestamp(stat.st_ctime).isoformat()
mtime = datetime.fromtimestamp(stat.st_mtime).isoformat()
title = os.path.splitext(os.path.split(source)[1])[0]
metadata = dict(filename=relative,
mime_type=mime_type,
ctime=ctime,
mtime=mtime,
title=title)
if not count:
# create a new record
self.create(metadata, source)
else:
# update the object with the new content iif the
# checksum is different
# XXX: what if there is more than one? (shouldn't
# happen)
# FIXME This is throwing away all the entry metadata.
# Disabled for trial-3. We are not doing indexing
# anyway so it would just update the mtime which is
# not that useful. Also the journal is currently
# setting the mime type before saving the file making
# the mtime check useless.
#
# content = result.next()
# uid = content.id
# saved_mtime = content.get_property('mtime')
# if mtime != saved_mtime:
# self.update(uid, metadata, source)
pass
except Exception, e:
logging.exception('Error while processing %r: %r' % (fn, e))
if files_to_check:
return True
else:
self._walk_source = None
return False
def _translatePath(self, uid):
try: content = self.indexmanager.get(uid)
except KeyError: return None
return os.path.join(self.uri, content.get_property('filename', uid))
## def _targetFile(self, uid, target=None, ext=None, env=None):
## # in this case the file should really be there unless it was
## # deleted in place or something which we typically isn't
## # allowed
## # XXX: catch this case and remove the index
## targetpath = self._translatePath(uid)
## return open(targetpath, 'rw')
# File Management API
def create_async(self, props, filelike, completion, can_move=False):
"""Inplace backing store doesn't copy, so no need for async"""
try:
uid = self.create(props, filelike, can_move)
completion(None, uid)
except Exception, exc:
completion(exc)
def _get_unique_filename(self, suggested_filename):
# Invalid characters in VFAT filenames. From
# http://en.wikipedia.org/wiki/File_Allocation_Table
invalid_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|', '\x7F']
invalid_chars.extend([chr(x) for x in range(0, 32)])
filename = suggested_filename
for char in invalid_chars:
filename = filename.replace(char, '_')
# FAT limit is 255, leave some space for uniqueness
max_len = 250
if len(filename) > max_len:
name, extension = os.path.splitext(filename)
filename = name[0:max_len - extension] + extension
if os.path.exists(os.path.join(self.uri, filename)):
i = 1
while len(filename) <= max_len:
name, extension = os.path.splitext(filename)
filename = name + '_' + str(i) + extension
if not os.path.exists(os.path.join(self.uri, filename)):
break
i += 1
if len(filename) > max_len:
filename = None
return filename
def create(self, props, filelike, can_move=False):
# the file would have already been changed inplace
# don't touch it
proposed_name = None
if filelike:
if isinstance(filelike, basestring):
# lets treat it as a filename
filelike = open(filelike, "r")
filelike.seek(0)
# usually with USB drives and the like the file we are
# indexing is already on it, however in the case of moving
# files to these devices we need to detect this case and
# place the file
proposed_name = props.get('filename', None)
if proposed_name is None:
proposed_name = props.get('suggested_filename', None)
if proposed_name is None:
proposed_name = os.path.split(filelike.name)[1]
proposed_name = self._get_unique_filename(proposed_name)
# record the name before qualifying it to the store
props['filename'] = proposed_name
proposed_name = os.path.join(self.uri, proposed_name)
uid = self.indexmanager.index(props)
props['uid'] = uid
path = filelike
if proposed_name and not os.path.exists(proposed_name):
path = self._writeContent(uid, filelike, replace=False, target=proposed_name)
self.indexmanager.index(props, path)
return uid
def get(self, uid, env=None, allowMissing=False):
content = self.indexmanager.get(uid)
if not content: raise KeyError(uid)
return content
def update_async(self, uid, props, filelike, completion, can_move=False):
try:
self.update(uid, props, filelike, can_move)
completion()
except Exception, exc:
completion(exc)
def update(self, uid, props, filelike=None, can_move=False):
# the file would have already been changed inplace
# don't touch it
props['uid'] = uid
proposed_name = None
if filelike:
if isinstance(filelike, basestring):
# lets treat it as a filename
filelike = open(filelike, "r")
filelike.seek(0)
# usually with USB drives and the like the file we are
# indexing is already on it, however in the case of moving
# files to these devices we need to detect this case and
# place the file
proposed_name = props.get('filename', None)
if not proposed_name:
proposed_name = os.path.split(filelike.name)[1]
# record the name before qualifying it to the store
props['filename'] = proposed_name
proposed_name = os.path.join(self.uri, proposed_name)
path = filelike
if proposed_name:
path = self._writeContent(uid, filelike, replace=True, target=proposed_name)
self.indexmanager.index(props, path)
def delete(self, uid):
self._delete_external_properties(uid)
c = self.indexmanager.get(uid)
path = c.get_property('filename', None)
self.indexmanager.delete(uid)
if path:
path = os.path.join(self.uri, path)
if os.path.exists(path):
os.unlink(path)
def stop(self):
if self._walk_source is not None:
gobject.source_remove(self._walk_source)
self.indexmanager.stop(force=True)
def complete_indexing(self):
# TODO: Perhaps we should move the inplace indexing to be sync here?
self.indexmanager.complete_indexing()
|