Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/gdatastore/index.py
diff options
context:
space:
mode:
Diffstat (limited to 'gdatastore/index.py')
-rw-r--r--gdatastore/index.py62
1 files changed, 56 insertions, 6 deletions
diff --git a/gdatastore/index.py b/gdatastore/index.py
index faa02c5..f60125f 100644
--- a/gdatastore/index.py
+++ b/gdatastore/index.py
@@ -219,9 +219,10 @@ class QueryParser(xapian.QueryParser):
class Index(object):
- def __init__(self, base_dir):
- self._base_dir = base_dir
+ def __init__(self, data_stores):
+ self._data_stores = data_stores
self._database = None
+ self._base_dir = data_stores[0]['index_dir']
if not os.path.exists(self._base_dir):
os.makedirs(self._base_dir)
@@ -248,6 +249,7 @@ class Index(object):
return True
def delete(self, object_id):
+ writable_db = self._get_writable_db()
object_id_term = _object_id_term(object_id)
if __debug__:
enquire = Enquire(self._database)
@@ -255,7 +257,9 @@ class Index(object):
documents = [hit.document for hit in enquire.get_mset(0, 2, 2)]
assert len(documents) == 1
- self._database.delete_document(object_id_term)
+ writable_db.delete_document(object_id_term)
+ writable_db.commit()
+ self._reopen_dbs()
def find(self, query_dict, query_string, options):
offset = options.pop('offset', 0)
@@ -320,8 +324,14 @@ class Index(object):
def retrieve(self, object_id):
postings = self._database.postlist(_object_id_term(object_id))
- document = self._database.get_document(postings.next().docid)
- return deserialise_metadata(document.get_data())
+ doc_id = postings.next().docid
+ document = self._database.get_document(doc_id)
+ # When using multiple databases, Xapian document IDs are
+ # interleaved:
+ # global_doc_id = (local_doc_id - 1) * num_databases + db_index + 1
+ ds_index = (doc_id - 1) % len(self._data_stores)
+ return {'metadata': deserialise_metadata(document.get_data()),
+ 'data_store': self._data_stores[ds_index]}
def store(self, object_id, properties):
logging.debug('store(%r, %r)', object_id, properties)
@@ -334,7 +344,10 @@ class Index(object):
term_generator = TermGenerator()
term_generator.index_document(document, properties)
assert (document.get_value(_VALUE_TREE_ID), document.get_value(_VALUE_VERSION_ID)) == object_id
- self._database.replace_document(id_term, document)
+ writable_db = self._get_writable_db()
+ writable_db.replace_document(id_term, document)
+ writable_db.commit()
+ self._reopen_dbs()
def _create_database(self):
database = WritableDatabase(self._base_dir, xapian.DB_CREATE_OR_OPEN)
@@ -359,6 +372,43 @@ class Index(object):
raise DSIndexError('Unsupported index version: %d > %d' %
(version, _CURRENT_VERSION))
+ self._reopen_dbs()
+
+ def _get_writable_db(self):
+ """Return a writable database instance
+
+ In regular single-database mode, just return the single database
+ instance. In multi-database mode, return a new writable database
+ instance.
+ """
+ if len(self._data_stores) == 1:
+ return self._database
+ return WritableDatabase(self._base_dir, xapian.DB_OPEN)
+
+ def _reopen_dbs(self):
+ """Reopen databases if necessary
+
+ In regular single-database, do nothing. In multi-database mode,
+ re-open all databases to ensure we use the latest versions of all
+ databases.
+
+ Invoke this when either we have updated the primary database
+ ourselves or the additional databases may have been updated by
+ some other process.
+ """
+ if len(self._data_stores) == 1:
+ return
+ self._database = xapian.Database(self._base_dir)
+ for db_info in self._data_stores[1:]:
+ db = xapian.Database(db_info['index_dir'])
+ version = int(db.get_metadata('gdatastore_version'))
+ if version != _CURRENT_VERSION:
+ logging.warning('Skipping extra index %r due to version'
+ ' mismatch', db_info['index_dir'])
+ continue
+
+ self._database.add_database(db)
+
def deserialise_metadata(serialised):
"""Deserialise a string generated by serialise_metadata().