Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/src/olpc/datastore/backingstore.py
blob: dcb989ee09b51ecb7e3e4a94d0acc1fc219c116b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
""" 
olpc.datastore.backingstore
~~~~~~~~~~~~~~~~~~~~~~~~~~~
management of stable storage for the datastore

""" 

__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>'
__docformat__ = 'restructuredtext'
__copyright__ = 'Copyright ObjectRealms, LLC, 2007'
__license__  = 'The GNU Public License V2+'

import cPickle as pickle
from datetime import datetime
import gnomevfs
import os
import re
import time
import threading

from olpc.datastore.xapianindex import IndexManager
from olpc.datastore import bin_copy
from olpc.datastore import utils

# changing this pattern impacts _targetFile
filename_attempt_pattern = re.compile('\(\d+\)$')

# capabilities
# list of strings on a backing store that indicate it provides an
# implementation of some feature another layer can expect
# "file" indicates that the store can return traditional filelike
# objects and is the baseline capability expected of most other layers
# "inplace" indicates that the backingstore will update/index the
# files as they reside on the store. Files will not be moved.
# "versions" indicates that the store will keep revisions of content
# internally. This has implications at the search layer which may
# perform different operations on stores that exhibit this capability


class BackingStore(object):
    """Backing stores manage stable storage. We abstract out the
    management of file/blob storage through this class, as well as the
    location of the backing store itself (it may be done via a network
    connection for example).

    While the backingstore is responsible for implementing the
    metadata interface no implementation is provided here. It is
    assumed by that interface that all the features enumerated in
    olpc.datastore.model are provided.
    
    """
    capabilities = ()
    
    def __init__(self, uri, **kwargs):
       """The kwargs are used to configure the backend so it can
        provide its interface. See specific backends for details
        """
        pass

    def __repr__(self):
        return "<%s %s: %s %s>" % (self.__class__.__name__, self.id,
                            self.title, self.uri)
    # Init phases
    @staticmethod
    def parse(uri):
        """parse the uri into an actionable mount-point.
        Returns True or False indicating if this backend handles a
        given uri.
        """
        return False

    def initialize_and_load(self):
        """phase to check the state of the located mount point, this
        method returns True (mount point is valid) or False (invalid
        or uninitialized mount point).

        self.check() which must return a boolean should check if the
        result of self.locate() is already a datastore and then
        initialize/load it according to self.options.
        
        When True self.load() is invoked.
        When False self.create() followed by self.load() is invoked.
        """
        if self.check() is False:
            self.initialize()
        self.load()

    def check(self):
        return False
    
    def load(self):
        """load the index for a given mount-point, then initialize its
        fulltext subsystem. This is the routine that will bootstrap
        the indexmanager (though create() may have just created it)
        """
        pass

    def initialize(self):
        """Initialize a new mount point"""
        pass
    
    # Informational
    def descriptor(self):
        """return a dict with atleast the following keys
              'id' -- the id used to refer explicitly to the mount point
              'title' -- Human readable identifier for the mountpoint
              'uri' -- The uri which triggered the mount
        """
        pass

    @property
    def id(self): return self.descriptor()['id']
    @property
    def title(self): return self.descriptor()['title']

    # Storage Translation
    def localizedName(self, uid=None, content=None, target=None):
        """taking any of uid, a content object, or a direct target
    filename (which includes all of the relative components under a
    store). Return the localized filename that should be used _within_
    the repository for the storage of this content object
    """
        pass


class FileBackingStore(BackingStore):
    """ A backing store that directs maps the storage of content
    objects to an available filesystem.


    # not really true, the change would result in the older version
    having the last content and the new version as well. The old one
    is left in the newest start rather than start state. if that can work...
    The semantics of interacting
    with the repositories mean that the current copy _can_ be edited
    in place. Its the actions create/update that create new revisions
    in the datastore and hence new versions.
    """
    STORE_NAME = "store"
    INDEX_NAME = "index"
    DESCRIPTOR_NAME = "metainfo"

    capabilities = ("file")
    
    def __init__(self, uri, **kwargs):
        """ FileSystemStore(path=<root of managed storage>)
        """
        self.options = kwargs
        self.local_indexmanager = self.options.get('local_indexmanager', True)

        self.uri = uri
        self.base = os.path.join(uri, self.STORE_NAME)
        self.indexmanager = None
        
    # Informational
    def descriptor(self):
        """return a dict with atleast the following keys
              'id' -- the id used to refer explicitly to the mount point
              'title' -- Human readable identifier for the mountpoint
              'uri' -- The uri which triggered the mount
        """
        # a hidden file with a pickled dict will live in the base
        # directory for each storage
        fn = os.path.join(self.base, self.DESCRIPTOR_NAME)
        if os.path.exists(fn):
            try:
                fp = open(fn, 'r')
                desc = pickle.load(fp)
                fp.close()
            except:
                desc = None
        if not desc:
            # the data isn't there, this could happen for a number of
            # reasons (the store isn't writeable)
            # or if the information on it was corrupt
            # in this case, just create a new one
            desc = {'id' : self.uri,
                    'uri' : self.uri,
                    'title' : self.uri
                    }
            self.create_descriptor(**desc)
            
        return desc
    
    
    def create_descriptor(self, **kwargs):
        # create the information descriptor for this store
        # defaults will be created if need be
        # passing limited values will leave existing keys in place
        kwargs = utils._convert(kwargs)
        fn = os.path.join(self.base, self.DESCRIPTOR_NAME)
        desc = {}
        if os.path.exists(fn):
            fp = open(fn, 'r')
            try:
                desc = pickle.load(fp)
            except:
                desc = {}
            finally:
                fp.close()

        desc.update(kwargs)
        
        if 'id' not in desc: desc['id'] = utils.create_uid()
        if 'uri' not in desc: desc['uri'] = self.uri
        if 'title' not in desc: desc['title'] = self.uri


        fp = open(fn, 'w')
        pickle.dump(desc, fp)
        fp.close()


    @staticmethod
    def parse(uri):
        return os.path.isabs(uri) or os.path.isdir(uri)

    def check(self):
        if not os.path.exists(self.uri): return False
        if not os.path.exists(self.base): return False
        return True
    
    def initialize(self):
        if not os.path.exists(self.base):
            os.makedirs(self.base)

        # examine options and see what the indexmanager plan is
        if self.local_indexmanager:
            # create a local storage using the indexmanager
            # otherwise we will connect the global manager
            # in load
            index_name = os.path.join(self.base, self.INDEX_NAME)
            options = utils.options_for(self.options, 'indexmanager.')
            im = IndexManager()
            # This will ensure the fulltext and so on are all assigned
            im.bind_to(self)
            im.connect(index_name, **options)

            self.create_descriptor(**options)
            self.indexmanager = im
            
    def load(self):
        if not self.indexmanager and self.local_indexmanager:
            # create a local storage using the indexmanager
            # otherwise we will connect the global manager
            # in load
            index_name = os.path.join(self.base, self.INDEX_NAME)
            options = utils.options_for(self.options, 'indexmanager.')
            im = IndexManager()

            desc = utils.options_for(self.options,
                                     'indexmanager.',
                                     invert=True)
            if desc: self.create_descriptor(**desc)
                
            # This will ensure the fulltext and so on are all assigned
            im.bind_to(self)
            im.connect(index_name)

            self.indexmanager = im
            
    def bind_to(self, datastore):
        ## signal from datastore that we are being bound to it
        self.datastore = datastore

    def localizedName(self, uid=None, content=None, target=None):
        """taking any of uid, a content object, or a direct target
    filename (which includes all of the relative components under a
    store). Return the localized filename that should be used _within_
    the repository for the storage of this content object
    """
        if target: return os.path.join(self.base, target)
        elif content:
            # see if it expects a filename
            fn, ext = content.suggestName()
            if fn: return os.path.join(self.base, fn)
            if ext: return os.path.join(self.base, "%s.%s" %
                                        (content.id, ext))
            if not uid: uid = content.id

        if uid:
            return os.path.join(self.base, uid)
        else:
            raise ValueError("""Nothing submitted to generate internal
            storage name from""")
        
    def _translatePath(self, uid):
        """translate a UID to a path name"""
        # paths into the datastore
        return os.path.join(self.base, str(uid))

    def _targetFile(self, uid, target=None, ext=None, env=None):
        # paths out of the datastore, working copy targets
        path = self._translatePath(uid)
        if not os.path.exists(path):
            return None
        
        if target: targetpath = target
        else:
            targetpath = uid.replace('/', '_').replace('.', '__')
            if ext:
                if not ext.startswith('.'): ext = ".%s" % ext
                targetpath = "%s%s" % (targetpath, ext)

        base = '/tmp'
        if env: base = env.get('cwd', base)
        
        targetpath = os.path.join(base, targetpath)
        attempt = 0
        while os.path.exists(targetpath):
            # here we look for a non-colliding name
            # this is potentially a race and so we abort after a few
            # attempts
            targetpath, ext = os.path.splitext(targetpath)
            
            if filename_attempt_pattern.search(targetpath):
                targetpath = filename_attempt_pattern.sub('', targetpath)
                
            attempt += 1
            if attempt > 9:
                targetpath = "%s(%s).%s" % (targetpath, time.time(), ext)
                break

            targetpath = "%s(%s)%s" % (targetpath, attempt, ext)

        bin_copy.bin_copy(path, targetpath)
        return targetpath
            
    def _mapContent(self, uid, fp, path, env=None):
        """map a content object and the file in the repository to a
        working copy.
        """
        # env would contain things like cwd if we wanted to map to a
        # known space
        
        content = self.indexmanager.get(uid)
        # we need to map a copy of the content from the backingstore into the
        # activities addressable space.
        # map this to a rw file
        if fp:
            target, ext = content.suggestName()
            targetfile = self._targetFile(uid, target, ext, env)
            content.file = targetfile
        
        return content

    def _writeContent(self, uid, filelike, replace=True, target=None):
        content = None
        if not filelike: return
        if target: path = target
        else:
            path = self._translatePath(uid)
            
        if replace is False and os.path.exists(path):
            raise KeyError("objects with path:%s for uid:%s exists" %(
                path, uid))

        bin_copy.bin_copy(filelike, path)
        
    # File Management API
    def create(self, props, filelike):
        uid = self.indexmanager.index(props, filelike)
        self._writeContent(uid, filelike, replace=False)
        return uid
    
    def get(self, uid, env=None, allowMissing=False, includeFile=False):
        content = self.indexmanager.get(uid)
        if not content: raise KeyError(uid)
        path = self._translatePath(uid)
        fp = None
        # not all content objects have a file
        if includeFile and os.path.exists(path):
            fp = open(path, 'r')
            # now return a Content object from the model associated with
            # this file object
        return self._mapContent(uid, fp, path, env)

    def update(self, uid, props, filelike=None):
        if 'uid' not in props: props['uid'] = uid
            
        self.indexmanager.index(props, filelike)
        self._writeContent(uid, filelike)

    def delete(self, uid, allowMissing=True):
        self.indexmanager.delete(uid)
        path = self._translatePath(uid)
        if os.path.exists(path):
            os.unlink(path)
        else:
            if not allowMissing:
                raise KeyError("object for uid:%s missing" % uid)            
        
    def get_uniquevaluesfor(self, propertyname):
        return self.indexmanager.get_uniquevaluesfor(propertyname)
    

    def find(self, query):
        return self.indexmanager.search(query)

    def stop(self):
        self.indexmanager.stop()

    def complete_indexing(self):
        self.indexmanager.complete_indexing()

class InplaceFileBackingStore(FileBackingStore):
    """Like the normal FileBackingStore this Backingstore manages the
    storage of files, but doesn't move files into a repository. There
    are no working copies. It simply adds index data through its
    indexmanager and provides fulltext ontop of a regular
    filesystem. It does record its metadata relative to this mount
    point.

    This is intended for USB keys and related types of attachable
    storage.
    """

    STORE_NAME = ".olpc.store"
    capabilities = ("file", "inplace")
    
    def __init__(self, uri, **kwargs):
        # remove the 'inplace:' scheme
        uri = uri[len('inplace:'):]
        super(InplaceFileBackingStore, self).__init__(uri, **kwargs)
        # use the original uri
        self.uri = uri
        self.walker = None
        
    @staticmethod
    def parse(uri):
        return uri.startswith("inplace:")

    def check(self):
        if not os.path.exists(self.uri): return False
        if not os.path.exists(self.base): return False
        return True

        
    def load(self):
        super(InplaceFileBackingStore, self).load()
        # now map/update the existing data into the indexes
        # but do it async
        self.walker = threading.Thread(target=self._walk)
        self._runWalker = True
        self.walker.setDaemon(True)
        self.walker.start()

    def _walk(self):
        # XXX: a version that checked xattr for uid would be simple
        # and faster
        # scan the uri for all non self.base files and update their
        # records in the db
        for dirpath, dirname, filenames in os.walk(self.uri):
            # see if there is an entry for the filename
            if self.base in dirpath: continue
            if self.STORE_NAME in dirname:
                dirname.remove(self.STORE_NAME)

            # other files and dirs to blacklist
            if '.Trashes' in dirpath: continue
                
            
            for fn in filenames:
                # give the thread a chance to exit
                if not self._runWalker: break
                # blacklist files
                #   ignore conventionally hidden files
                if fn.startswith("."): continue
                
                source = os.path.join(dirpath, fn)
                relative = source[len(self.uri)+1:]
                source = os.path.abspath(source)
                
                result, count = self.indexmanager.search(dict(filename=relative))
                mime_type = gnomevfs.get_mime_type(source)
                stat = os.stat(source)
                ctime = datetime.fromtimestamp(stat.st_ctime).isoformat()
                mtime = datetime.fromtimestamp(stat.st_mtime).isoformat()
                title = os.path.splitext(os.path.split(source)[1])[0]
                metadata = dict(filename=relative,
                                mime_type=mime_type,
                                ctime=ctime,
                                mtime=mtime,
                                title=title)
                if not count:
                    # create a new record
                    self.create(metadata, source)
                else:
                    # update the object with the new content iif the
                    # checksum is different
                    # XXX: what if there is more than one? (shouldn't
                    # happen)
                    content = result.next()
                    uid = content.id
                    saved_mtime = content.get_property('mtime', None)
                    if mtime != saved_mtime:
                        self.update(uid, metadata, source)
        self.indexmanager.flush()
        return

    def _translatePath(self, uid):
        try: content = self.indexmanager.get(uid)
        except KeyError: return None
        return os.path.join(self.uri, content.get_property('filename', uid))


    # File Management API
    def create(self, props, filelike):
        # the file would have already been changed inplace
        # don't touch it
        proposed_name = None
        if filelike:
            # usually with USB drives and the like the file we are
            # indexing is already on it, however in the case of moving
            # files to these devices we need to detect this case and
            # place the file
            proposed_name = props.get('filename', None)
            if not proposed_name:
                proposed_name = os.path.split(filelike)[1]
            # record the name before qualifying it to the store
            props['filename'] = proposed_name
            proposed_name = os.path.join(self.uri, proposed_name)

        uid = self.indexmanager.index(props, filelike)

        if proposed_name and not os.path.exists(proposed_name):
            self._writeContent(uid, filelike, replace=False, target=proposed_name)

        return uid
    
    def get(self, uid, env=None, allowMissing=False):
        content = self.indexmanager.get(uid)
        if not content: raise KeyError(uid)
        return content

    def update(self, uid, props, filelike=None):
        # the file would have already been changed inplace
        # don't touch it
        props['uid'] = uid

        proposed_name = None
        if filelike:
            if isinstance(filelike, basestring):
                # lets treat it as a filename
                filelike = open(filelike, "r")
            filelike.seek(0)
            # usually with USB drives and the like the file we are
            # indexing is already on it, however in the case of moving
            # files to these devices we need to detect this case and
            # place the file
            proposed_name = props.get('filename', None)
            if not proposed_name:
                proposed_name = os.path.split(filelike.name)[1]
            # record the name before qualifying it to the store
            props['filename'] = proposed_name
            proposed_name = os.path.join(self.uri, proposed_name)

        self.indexmanager.index(props, filelike)

        if proposed_name:
            self._writeContent(uid, filelike, replace=True, target=proposed_name)
        
        
    def delete(self, uid):
        c = self.indexmanager.get(uid)
        path = c.get_property('filename', None)
        self.indexmanager.delete(uid)

        if path:
            path = os.path.join(self.uri, path)
            if os.path.exists(path):
                os.unlink(path)
        
    def stop(self):
        if self.walker and self.walker.isAlive():
            # XXX: just force the unmount, flush the index queue
            self._runWalker = False
        self.indexmanager.stop(force=True)

    def complete_indexing(self):
        if self.walker and self.walker.isAlive():
            self.walker.join()
        self.indexmanager.complete_indexing()