1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
|
"""
xapianindex
~~~~~~~~~~~~~~~~~~~~
maintain indexes on content
"""
from __future__ import with_statement
__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>'
__docformat__ = 'restructuredtext'
__copyright__ = 'Copyright ObjectRealms, LLC, 2007'
__license__ = 'The GNU Public License V2+'
from Queue import Queue, Empty
import logging
import re
import sys
import time
import thread
import threading
import warnings
import secore
from olpc.datastore import model
from olpc.datastore.converter import converter
from olpc.datastore.utils import create_uid, parse_timestamp_or_float
# Setup Logger
logger = logging.getLogger('org.sugar.datastore.xapianindex')
# Indexer Operations
CREATE = 1
UPDATE = 2
DELETE = 3
class ContentMappingIter(object):
"""An iterator over a set of results from a search.
"""
def __init__(self, results, backingstore, model):
self._results = results
self._backingstore = backingstore
self._iter = iter(results)
self._model = model
def __iter__(self): return self
def next(self):
searchresult = self._iter.next()
return model.Content(searchresult, self._backingstore, self._model)
class IndexManager(object):
DEFAULT_DATABASE_NAME = 'index'
def __init__(self, default_language='en'):
# We will maintain two connections to the database
# we trigger automatic flushes to the read_index
# after any write operation
self.write_index = None
self.read_index = None
self.queue = Queue(0)
self.indexer_running = False
self.language = default_language
self.backingstore = None
self.fields = set()
self._write_lock = threading.Lock()
#
# Initialization
def connect(self, repo, **kwargs):
if self.write_index is not None:
warnings.warn('''Requested redundant connect to index''',
RuntimeWarning)
self.repo = repo
self.write_index = secore.IndexerConnection(repo)
# configure the database according to the model
datamodel = kwargs.get('model', model.defaultModel)
datamodel.apply(self)
# store a reference
self.datamodel = datamodel
self.read_index = secore.SearchConnection(repo)
self.flush()
# by default we start the indexer now
self.startIndexer()
assert self.indexer.isAlive()
def bind_to(self, backingstore):
# signal from backingstore that its our parent
self.backingstore = backingstore
def stop(self):
self.stopIndexer()
self.write_index.close()
self.read_index.close()
# Index thread management
def startIndexer(self):
self.indexer_running = True
self.indexer = threading.Thread(target=self.indexThread)
self.indexer.setDaemon(True)
self.indexer.start()
def stopIndexer(self, force=False):
if not self.indexer_running: return
if not force: self.queue.join()
self.indexer_running = False
self.indexer.join()
# flow control
def flush(self):
"""Called after any database mutation"""
with self._write_lock:
self.write_index.flush()
self.read_index.reopen()
def enque(self, uid, vid, doc, operation, filestuff=None):
# here we implement the sync/async policy
# we want to take create/update operations and
# set theproperties right away, the
# conversion/fulltext indexing can
# happen in the thread
if operation in (CREATE, UPDATE):
with self._write_lock:
if operation is CREATE:
self.write_index.add(doc)
logger.info("created %s:%s" % (uid, vid))
elif operation is UPDATE:
self.write_index.replace(doc)
logger.info("updated %s:%s" % (uid, vid))
self.flush()
# now change CREATE to UPDATE as we set the
# properties already
operation = UPDATE
if not filestuff:
# In this case we are done
return
self.queue.put((uid, vid, doc, operation, filestuff))
def indexThread(self):
# process the queue
# XXX: there is currently no way to remove items from the queue
# for example if a USB stick is added and quickly removed
# the mount should however get a stop() call which would
# request that the indexing finish
while self.indexer_running:
# include timeout here to ease shutdown of the thread
# if this is a non-issue we can simply allow it to block
try:
data = self.queue.get(True, 0.025)
uid, vid, doc, operation, filestuff = data
except Empty:
#time.sleep(1.0)
continue
try:
with self._write_lock:
if operation is DELETE:
self.write_index.delete(uid)
logger.info("deleted content %s" % (uid,))
elif operation is UPDATE:
# Here we handle the conversion of binary
# documents to plain text for indexing. This is
# done in the thread to keep things async and
# latency lower.
# we know that there is filestuff or it
# wouldn't have been queued
filename, mimetype = filestuff
fp = converter(filename, mimetype)
if fp:
# read in at a fixed block size, try to
# conserve memory. If this doesn't work
# we can make doc.fields a generator
while True:
chunk = fp.read(2048)
if not chunk: break
doc.fields.append(secore.Field('fulltext', chunk))
self.write_index.replace(doc)
logger.info("update file content %s:%s" % (uid, vid))
else:
logger.warning("""Conversion process failed for document %s %s""" % (uid, filename))
else:
logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation))
# tell the queue its complete
self.queue.task_done()
# we do flush on each record now
self.flush()
except:
logger.exception("Error in indexer")
def complete_indexing(self):
"""Intentionally block until the indexing is complete. Used
primarily in testing.
"""
self.queue.join()
self.flush()
#
# Field management
def addField(self, key, store=True, exact=False, sortable=False,
type='string', collapse=False,
**kwargs):
language = kwargs.pop('language', self.language)
xi = self.write_index.add_field_action
if store: xi(key, secore.FieldActions.STORE_CONTENT)
if exact: xi(key, secore.FieldActions.INDEX_EXACT)
else:
# weight -- int 1 or more
# nopos -- don't include positional information
# noprefix -- boolean
xi(key, secore.FieldActions.INDEX_FREETEXT, language=language, **kwargs)
if sortable:
xi(key, secore.FieldActions.SORTABLE, type=type)
if collapse:
xi(key, secore.FieldActions.COLLAPSE)
# track this to find missing field configurations
self.fields.add(key)
#
# Index Functions
def _mapProperties(self, props):
"""data normalization function, maps dicts of key:kind->value
to Property objects
"""
d = {}
add_anything = False
for k,v in props.iteritems():
p, added = self.datamodel.fromstring(k, v,
allowAddition=True)
if added is True:
self.fields.add(p.key)
add_anything = True
d[p.key] = p
if add_anything:
with self._write_lock:
self.datamodel.apply(self)
return d
def index(self, props, filename=None):
"""Index the content of an object.
Props must contain the following:
key -> Property()
"""
operation = UPDATE
#
# Version handling
#
# we implicitly create new versions of documents the version
# id should have been set by the higher level system
uid = props.pop('uid', None)
vid = props.pop('vid', None)
if not uid:
uid = create_uid()
operation = CREATE
if vid: vid = str(float(vid) + 1.0)
else: vid = "1.0"
# Property mapping via model
props = self._mapProperties(props)
doc = secore.UnprocessedDocument()
add = doc.fields.append
fp = None
filestuff = None
if filename:
# enque async file processing
# XXX: to make sure the file is kept around we could keep
# and open fp?
mimetype = props.get("mime_type")
mimetype = mimetype and mimetype.value or 'text/plain'
filestuff = (filename, mimetype)
doc.id = uid
add(secore.Field('vid', vid))
#
# Property indexing
for k, prop in props.iteritems():
value = prop.for_xapian
if k not in self.fields:
warnings.warn("""Missing field configuration for %s""" % k,
RuntimeWarning)
continue
add(secore.Field(k, value))
# queue the document for processing
self.enque(uid, vid, doc, operation, filestuff)
return uid
def get(self, uid):
doc = self.read_index.get_document(uid)
if not doc: raise KeyError(uid)
return model.Content(doc, self.backingstore, self.datamodel)
def delete(self, uid):
# does this need queuing?
# the higher level abstractions have to handle interaction
# with versioning policy and so on
self.enque(uid, None, None, DELETE)
#
# Search
def search(self, query, start_index=0, end_index=4096):
"""search the xapian store.
query is a string defining the serach in standard web search syntax.
ie: it contains a set of search terms. Each search term may be
preceded by a "+" sign to indicate that the term is required, or a "-"
to indicate that is is required to be absent.
"""
ri = self.read_index
if not query:
q = self.read_index.query_all()
elif isinstance(query, dict):
queries = []
q = query.pop('query', None)
if q:
queries.append(self.parse_query(q))
if not query and not queries:
# we emptied it
q = self.read_index.query_all()
else:
# each term becomes part of the query join
for k, v in query.iteritems():
if isinstance(v, dict):
# it might be a range scan
# this needs to be factored out
# and/or we need client side lib that helps
# issue queries because there are type
# conversion issues here
start = v.pop('start', 0)
end = v.pop('end', sys.maxint)
start = parse_timestamp_or_float(start)
end = parse_timestamp_or_float(end)
queries.append(ri.query_range(k, start, end))
elif isinstance(v, list):
# construct a set of OR queries
ors = []
for item in v: ors.append(ri.query_field(k, item))
queries.append(ri.query_composite(ri.OP_OR, ors))
else:
queries.append(ri.query_field(k, v))
q = ri.query_composite(ri.OP_AND, queries)
else:
q = self.parse_query(query)
results = ri.search(q, start_index, end_index)
count = results.matches_estimated
# map the result set to model.Content items
return ContentMappingIter(results, self.backingstore, self.datamodel), count
def get_uniquevaluesfor(self, property):
# XXX: this is very sketchy code
# try to get the searchconnection to support this directly
# this should only apply to EXACT fields
r = set()
prefix = self.read_index._field_mappings.get_prefix(property)
plen = len(prefix)
termiter = self.read_index._index.allterms(prefix)
for t in termiter:
term = t.term
if len(term) > plen:
term = term[plen:]
if term.startswith(':'): term = term[1:]
r.add(term)
# r holds the textual representation of the fields value set
# if the type of field or property needs conversion to a
# different python type this has to happen now
descriptor = self.datamodel.fields.get(property)
if descriptor:
kind = descriptor[1]
impl = model.propertyByKind(kind)
r = set([impl.set(i) for i in r])
return r
def parse_query(self, query):
# accept standard web query like syntax
# 'this' -- match this
# 'this that' -- match this and that in document
# '"this that"' match the exact pharse 'this that'
# 'title:foo' match a document whose title contains 'foo'
# 'title:"A tale of two datastores"' exact title match
# '-this that' match that w/o this
ri = self.read_index
start = 0
end = len(query)
nextword = re.compile("(\S+)")
endquote = re.compile('(")')
queries = []
while start < end:
m = nextword.match(query, start)
if not m: break
orig = start
field = None
start = m.end() + 1
word = m.group(1)
if ':' in word:
# see if its a field match
fieldname, w = word.split(':', 1)
if fieldname in self.fields:
field = fieldname
word = w
if word.startswith('"'):
qm = endquote.search(query, start)
if qm:
#XXX: strip quotes or not here
#word = query[orig+1:qm.end(1)-1]
word = query[orig:qm.end(1)]
start = qm.end(1) + 1
if field:
queries.append(ri.query_field(field, word))
else:
queries.append(ri.query_parse(word))
q = ri.query_composite(ri.OP_AND, queries)
return q
|