Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/src/olpc/datastore/xapianindex.py
blob: 1faa6a99225de78c8d7a1afeb0373f846e24e894 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
""" 
xapianindex
~~~~~~~~~~~~~~~~~~~~
maintain indexes on content

""" 
from __future__ import with_statement

__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>'
__docformat__ = 'restructuredtext'
__copyright__ = 'Copyright ObjectRealms, LLC, 2007'
__license__  = 'The GNU Public License V2+'



from Queue import Queue, Empty
import gc
import logging
import os
import re
import sys
import time
import thread
import threading
import warnings

import secore
import xapian as _xapian # we need to modify the QueryParser

from olpc.datastore import model 
from olpc.datastore.converter import converter
from olpc.datastore.utils import create_uid, parse_timestamp_or_float


# Setup Logger
logger = logging.getLogger('org.sugar.datastore.xapianindex')

# Indexer Operations
CREATE = 1
UPDATE = 2
DELETE = 3

ADD = 1
REMOVE = 2


class ContentMappingIter(object):
    """An iterator over a set of results from a search.

    """
    def __init__(self, results, backingstore, model):
        self._results = results
        self._backingstore = backingstore
        self._iter = iter(results)
        self._model = model

    def __iter__(self): return self
    
    def next(self):
        searchresult = self._iter.next()
        return model.Content(searchresult, self._backingstore, self._model)


class IndexManager(object):
    DEFAULT_DATABASE_NAME = 'index'
    
    def __init__(self, default_language='en'):
        # We will maintain two connections to the database
        # we trigger automatic flushes to the read_index
        # after any write operation        
        self.write_index = None
        self.read_index = None
        self.queue = Queue(0)
        self.indexer_running = False
        self.language = default_language

        self.backingstore = None
        
        self.fields = set()
        self._write_lock = threading.Lock()
    #
    # Initialization
    def connect(self, repo, **kwargs):
        if self.write_index is not None:
            warnings.warn('''Requested redundant connect to index''',
                          RuntimeWarning)

        self.repo = repo
        self.write_index = secore.IndexerConnection(repo)

        # configure the database according to the model
        datamodel = kwargs.get('model', model.defaultModel)
        datamodel.apply(self)

        # store a reference
        self.datamodel = datamodel
        
        self.read_index = secore.SearchConnection(repo)

        self.flush()        
        # by default we start the indexer now
        self.startIndexer()
        assert self.indexer.isAlive()

                
    def bind_to(self, backingstore):
        # signal from backingstore that its our parent
        self.backingstore = backingstore

    
    def stop(self, force=False):
        self.stopIndexer(force)
        self.write_index.close()
        self.read_index.close()
        # XXX: work around for xapian not having close() this will
        # change in the future in the meantime we delete the
        # references to the indexers and then force the gc() to run
        # which should inturn trigger the C++ destructor which forces
        # the database shut.
        self.write_index = None
        self.read_index = None
        gc.collect()
        
    # Index thread management
    def startIndexer(self):
        self.indexer_running = True
        self.indexer = threading.Thread(target=self.indexThread)
        self.indexer.setDaemon(True)
        self.indexer.start()
        
    def stopIndexer(self, force=False):
        if not self.indexer_running: return 
        if not force: self.queue.join()
        self.indexer_running = False
        # should terminate after the current task
        self.indexer.join()

    # flow control
    def flush(self):
        """Called after any database mutation"""
        with self._write_lock:
            self.write_index.flush()
            self.read_index.reopen()

    def enqueSequence(self, commands):
        """Takes a sequence of arugments to the normal enque function
        and executes them under a single lock/flush cycle
        """
        self.queue.put(commands)
        
    def enque(self, uid, vid, doc, operation, filestuff=None):
        # here we implement the sync/async policy
        # we want to take create/update operations and
        # set theproperties right away, the
        # conversion/fulltext indexing can
        # happen in the thread
        if operation in (CREATE, UPDATE):
            with self._write_lock:
                if operation is CREATE:
                    self.write_index.add(doc)
                    logger.info("created %s:%s" % (uid, vid))
                elif operation is UPDATE:
                    self.write_index.replace(doc)
                    logger.info("updated %s:%s" % (uid, vid))

            self.flush()
            # now change CREATE to UPDATE as we set the
            # properties already
            operation = UPDATE
            if not filestuff:
                # In this case we are done
                return
        elif operation is DELETE:
            # sync deletes
            with self._write_lock:
                self.write_index.delete(uid)
                logger.info("deleted content %s" % (uid,))
            self.flush()
            return
        
        self.queue.put((uid, vid, doc, operation, filestuff))

    def indexThread(self):
        # process the queue
        # XXX: there is currently no way to remove items from the queue
        # for example if a USB stick is added and quickly removed
        # the mount should however get a stop() call which would
        # request that the indexing finish
        # XXX: we can in many cases index, not from the tempfile but
        # from the item in the repo as that will become our immutable
        # copy. Detect those cases and use the internal filename
        # property or backingstore._translatePath to get at it
        versions = self.versions
        inplace = self.inplace
        q = self.queue
        while self.indexer_running:
            # include timeout here to ease shutdown of the thread
            # if this is a non-issue we can simply allow it to block
            try:
                # XXX: on shutdown there is a race where the queue is
                # joined while this get blocks, the exception seems
                # harmless though
                data = q.get(True, 0.025)
                # when we enque a sequence of commands they happen
                # under a single write lock pass through the loop and
                # the changes become visible at once.
                
                if not isinstance(data[0], (list, tuple)):
                    data = (data,)
            except Empty:
                continue

            try:
                with self._write_lock:
                    for item in data:
                        uid, vid, doc, operation, filestuff = item
                        if operation is DELETE:
                            self.write_index.delete(uid)
                            logger.info("deleted content %s" % (uid,))
                        elif operation is UPDATE:
                            # Here we handle the conversion of binary
                            # documents to plain text for indexing. This is
                            # done in the thread to keep things async and
                            # latency lower.
                            # we know that there is filestuff or it
                            # wouldn't have been queued
                            filename, mimetype = filestuff
                            if isinstance(filename, file):
                                filename = filename.name
                            if filename and not os.path.exists(filename):
                                # someone removed the file before
                                # indexing
                                # or the path is broken
                                logger.warning("Expected file for"
                                               " indexing at %s. Not"
                                               " Found" % filename)
                                
                            fp = converter(filename, mimetype=mimetype)
                            if fp:
                                # fixed size doesn't make sense, we
                                # shouldn't be adding fulltext unless
                                # it converted down to plain text in
                                # the first place
                                
                                while True:
                                    chunk = fp.read(2048)
                                    if not chunk: break
                                    doc.fields.append(secore.Field('fulltext', chunk))

                                self.write_index.replace(doc)
                                
                                if versions and not inplace:
                                    # we know the source file is ours
                                    # to remove 
                                    os.unlink(filename)
                                    
                                logger.info("update file content %s:%s" % (uid, vid))
                            else:
                                logger.debug("""Conversion process failed for document %s %s""" % (uid, filename))
                        else:
                            logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation))
                            
                    # tell the queue its complete 
                    self.queue.task_done()

                # we do flush on each record (or set for enque
                # sequences) now
                #self.flush()
            except:
                logger.exception("Error in indexer")
                

    def complete_indexing(self):
        """Intentionally block until the indexing is complete. Used
        primarily in testing.
        """
        self.queue.join()
        self.flush()
    
    #
    # Field management
    def addField(self, key, store=True, exact=False, sortable=False,
                 type='string', collapse=False,
                 **kwargs):
        language = kwargs.pop('language', self.language)
        
        xi = self.write_index.add_field_action
        
        if store: xi(key, secore.FieldActions.STORE_CONTENT)
        if exact: xi(key, secore.FieldActions.INDEX_EXACT)
        else:
            # weight -- int 1 or more
            # nopos  -- don't include positional information
            # noprefix -- boolean
            xi(key, secore.FieldActions.INDEX_FREETEXT, language=language, **kwargs)

        if sortable:
            xi(key, secore.FieldActions.SORTABLE, type=type)
        if collapse:
            xi(key, secore.FieldActions.COLLAPSE)

        # track this to find missing field configurations
        self.fields.add(key)

    #
    # Index Functions
    def _mapProperties(self, props):
        """data normalization function, maps dicts of key:kind->value
        to Property objects
        """
        d = {}
        add_anything = False
        for k,v in props.iteritems():
            p, added = self.datamodel.fromstring(k, v,
                                                 allowAddition=True)
            if added is True:
                self.fields.add(p.key)
                add_anything = True
            d[p.key] = p

        if add_anything:
            with self._write_lock:
                self.datamodel.apply(self)
            
        return d

    @property
    def versions(self):
        if self.backingstore:
            return "versions" in self.backingstore.capabilities
        return False

    @property
    def inplace(self):
        if self.backingstore:
            return "inplace" in self.backingstore.capabilities
        return False

    def _parse_tags(self, tags):
        # convert tags into (TAG, rev) pairs indicating if this tag
        # applies to this rev or all revs
        # all revs is ('tag', False)
        # the specific rev (unknown in this function is ('tag', True)
        t = tags.lower().split()
        r = []
        for tag in t:
            all = True
            mode = ADD
            if tag.startswith("-"):
                tag = tag[1:]
                mode = REMOVE
                
            if tag[-2:] == ":0":
                tag = tag[:-2]
                
            r.append((tag, all, mode))
                       
        return r
    
    def tag(self, uid, tags, rev=None):
        # this can't create items so we either resolve the uid (which
        # should be a given since we got to this layer) or fail
        results, count = self.get_by_uid_prop(uid, rev)
        if count == 0:
            raise KeyError('unable to apply tags to uid %s' % uid)

        # pull the whole version chain
        results = list(results)
        
        tags = self._parse_tags(tags)

        for tag, all, mode in tags:
            if all:
                used = results
            else:
                # select the revision indicated by rev
                # when None is provided this will be the tip
                pass

            if not tag and mode is REMOVE:
                # special case the '-' which removes all tags
                for c in used:
                    if 'tags' in c._doc.data:
                        del c._doc.data['tags']
            else:
                # not a global remove so we need to look at each
                # document, each tag and handle them case by case
                for c in used:
                    # we need to manipulate the field list of the existing
                    # docs and then replace them in the database
                    # to avoid adding new versions

                    # XXX: this should really be model driven and support
                    # any field that is of the tags type...
                    existing = set(c.get_property('tags', []))
                    if tag in existing and mode is REMOVE:
                        existing.remove(tag)
                    else:
                        existing.add(tag)
                        
                    # XXX: low level interface busting
                    # replace the current tags with the updated set
                    c._doc.data['tags'] = list(existing)
                    

        # Sync version, (enque with update for async)
        with self._write_lock:
            for c in results:
                self.write_index.replace_document(c)
        
        
    def index(self, props, filename=None):
        """Index the content of an object.
        Props must contain the following:
            key -> Property()
        """
        operation = UPDATE

        #
        # Version handling
        #
        # are we doing any special handling for versions?
        uid = props.pop('uid', None)

        
        if not uid:
            uid = create_uid()
            operation = CREATE
            
        
        # Property mapping via model
        doc = secore.UnprocessedDocument()
        add = doc.fields.append

        vid = None
        if self.versions:
            vid = props.get("vid")
            if not vid:
                logger.warn("Didn't supply needed versioning information"
                            " on a backingstore which performs versioning")
            # each versions id is unique when using a versioning store
            doc.id = create_uid()
        else:
            doc.id = uid

        if not vid: vid = '1'


        # on non-versioning stores this is redundant but on versioning
        # stores it reference to the objects whole timeline
        props['uid'] = uid
        props['vid'] = vid
              
        props = self._mapProperties(props)

        filestuff = None
        if filename:
            # enque async file processing
            # XXX: to make sure the file is kept around we could keep
            # and open fp?
            mimetype = props.get("mime_type")
            mimetype = mimetype and mimetype.value or 'text/plain'

            filename = os.path.abspath(filename)
            filestuff = (filename, mimetype)


        #
        # Property indexing
        for k, prop in props.iteritems():
            value = prop.for_xapian
            
            if k not in self.fields:
                warnings.warn("""Missing field configuration for %s""" % k,
                              RuntimeWarning)
                continue
            
            add(secore.Field(k, value))
            
        # queue the document for processing
        self.enque(uid, vid, doc, operation, filestuff)

        return doc.id

    def get(self, uid):
        doc = self.read_index.get_document(uid)
        if not doc: raise KeyError(uid)
        return model.Content(doc, self.backingstore, self.datamodel)

    def get_by_uid_prop(self, uid, rev=None):
        # versioning stores fetch objects by uid
        # when rev is passed only that particular rev is returne
        ri =  self.read_index
        q = ri.query_field('uid', uid)
        if rev is not None:
            if rev == "tip":
                rev = self.backingstore.tip(uid)
                
            q = ri.query_filter(q, ri.query_field('vid', str(rev)))
        results, count = self._search(q, 0, 1000, sortby="-vid")
        
        return results, count
        
        
    
    def delete(self, uid):
        # does this need queuing?
        # the higher level abstractions have to handle interaction
        # with versioning policy and so on
        self.enque(uid, None, None, DELETE)
        
    #
    # Search
    def search(self, query, start_index=0, end_index=4096, order_by=None):
        """search the xapian store.
        query is a string defining the serach in standard web search syntax.

        ie: it contains a set of search terms.  Each search term may be
        preceded by a "+" sign to indicate that the term is required, or a "-"
        to indicate that is is required to be absent.
        """
        ri = self.read_index

        if not query:
            q = self.read_index.query_all()
        elif isinstance(query, dict):
            queries = []
            q = query.pop('query', None)
            if q:
                queries.append(self.parse_query(q))
            if not query and not queries:
                # we emptied it 
                q = self.read_index.query_all()
            else:
                # each term becomes part of the query join
                for k, v in query.iteritems():
                    if isinstance(v, dict):
                        # it might be a range scan
                        # this needs to be factored out
                        # and/or we need client side lib that helps
                        # issue queries because there are type
                        # conversion issues here
                        start = v.pop('start', 0)
                        end = v.pop('end', sys.maxint)
                        start = parse_timestamp_or_float(start)
                        end = parse_timestamp_or_float(end)
                        queries.append(ri.query_range(k, start, end))
                    elif isinstance(v, list):
                        # construct a set of OR queries
                        ors = []
                        for item in v: ors.append(ri.query_field(k, item))
                        queries.append(ri.query_composite(ri.OP_OR, ors))
                    else:
                        queries.append(ri.query_field(k, v))
                        
                q = ri.query_composite(ri.OP_AND, queries)
        else:
            q = self.parse_query(query)

        if order_by and isinstance(order_by, list):
            # secore only handles a single item, not a multilayer sort
            order_by = order_by[0]
            
        return self._search(q, start_index, end_index, sortby=order_by)
    
    def _search(self, q, start_index, end_index, sortby=None):
        start_index = int(start_index)
        end_index = int(end_index)
        sortby = str(sortby)
        results = self.read_index.search(q, start_index, end_index, sortby=sortby)
        count = results.matches_estimated

        # map the result set to model.Content items
        return ContentMappingIter(results, self.backingstore, self.datamodel), count


    def get_uniquevaluesfor(self, property):
        # XXX: this is very sketchy code
        # try to get the searchconnection to support this directly
        # this should only apply to EXACT fields
        r = set()
        prefix = self.read_index._field_mappings.get_prefix(property)
        plen = len(prefix)
        termiter = self.read_index._index.allterms(prefix)
        for t in termiter:
            term = t.term
            if len(term) > plen:
                term = term[plen:]
                if term.startswith(':'): term = term[1:]
                r.add(term)

        # r holds the textual representation of the fields value set
        # if the type of field or property needs conversion to a
        # different python type this has to happen now
        descriptor = self.datamodel.fields.get(property)
        if descriptor:
            kind = descriptor[1]
            impl = model.propertyByKind(kind)
            r = set([impl.set(i) for i in r])
        
        return r
                                                         
    def parse_query(self, query):
        # accept standard web query like syntax
        # 'this' -- match this
        # 'this that' -- match this and that in document
        # '"this that"' match the exact pharse 'this that'
        # 'title:foo' match a document whose title contains 'foo'
        # 'title:"A tale of two datastores"' exact title match
        # '-this that' match that w/o this

        # limited support for wildcard searches
        qp = _xapian.QueryParser
        
        flags = (qp.FLAG_LOVEHATE)
        
        ri = self.read_index
        start = 0
        end = len(query)
        nextword = re.compile("(\S+)")
        endquote = re.compile('(")')
        queries = []
        while start < end:
            m = nextword.match(query, start)
            if not m: break
            orig = start
            field = None
            start = m.end() + 1
            word = m.group(1)
            if ':' in word:
                # see if its a field match
                fieldname, w = word.split(':', 1)
                if fieldname in self.fields:
                    field = fieldname
                    
                word = w

            if word.startswith('"'):
                qm = endquote.search(query, start)
                if qm:
                    #XXX: strip quotes or not here
                    #word = query[orig+1:qm.end(1)-1]
                    word = query[orig:qm.end(1)]
                    # this is a phase modify the flags
                    flags |= qp.FLAG_PHRASE
                    start = qm.end(1) + 1

            if field:
                queries.append(ri.query_field(field, word))
            else:
                if word.endswith("*"):
                    flags |= qp.FLAG_WILDCARD
                q = self._query_parse(word, flags)
                
                queries.append(q)
        q = ri.query_composite(ri.OP_AND, queries)
        return q



    def _query_parse(self, word, flags=0, op=None):
        # while newer secore do pass flags it doesn't allow control
        # over them at the API level. We override here to support
        # adding wildcard searching
        ri = self.read_index
        if op is None: op = ri.OP_AND
        qp = ri._prepare_queryparser(None, None, op)
        try:
            return qp.parse_query(word, flags)
        except _xapian.QueryParserError, e:
            # If we got a parse error, retry without boolean operators (since
            # these are the usual cause of the parse error).
            return qp.parse_query(string, 0)