diff options
-rw-r--r-- | gdatastore/datastore.py | 40 | ||||
-rw-r--r-- | gdatastore/index.py | 62 |
2 files changed, 84 insertions, 18 deletions
diff --git a/gdatastore/datastore.py b/gdatastore/datastore.py index 9350209..6dd2170 100644 --- a/gdatastore/datastore.py +++ b/gdatastore/datastore.py @@ -513,7 +513,18 @@ class InternalApi(object): self._max_versions = gconf_client.get_int( '/desktop/sugar/datastore/max_versions') logging.debug('max_versions=%r', self._max_versions) - self._index = Index(os.path.join(self._base_dir, 'index')) + data_stores = [{'name': '', + 'index_dir': os.path.join(self._base_dir, 'index'), + 'git_dir': os.path.join(self._base_dir, 'git')}] + extras_dir = os.path.join(self._base_dir, 'extra-stores') + if not os.path.isdir(extras_dir): + os.makedirs(extras_dir) + for extra_name in os.listdir(extras_dir): + extra_dir = os.path.realpath(os.path.join(extras_dir, extra_name)) + data_stores.append({'name': extra_name, + 'index_dir': os.path.join(extra_dir, 'index'), + 'git_dir': os.path.join(extra_dir, 'git')}) + self._index = Index(data_stores) self._migrate() self._check_reindex() logging.info('ready') @@ -528,7 +539,7 @@ class InternalApi(object): logging.debug('change_metadata(%r, %r)', object_id, metadata) metadata['tree_id'], metadata['version_id'] = object_id if 'creation_time' not in metadata: - old_metadata = self._index.retrieve(object_id) + old_metadata = self._index.retrieve(object_id)['metadata'] metadata['creation_time'] = old_metadata['creation_time'] self._index.store(object_id, metadata) @@ -543,18 +554,21 @@ class InternalApi(object): def get_data_path(self, (tree_id, version_id), sender=None): logging.debug('get_data_path((%r, %r), %r)', tree_id, version_id, sender) - metadata = self._index.retrieve((tree_id, version_id)) + entry = self._index.retrieve((tree_id, version_id)) + metadata = entry['metadata'] + git_dir = entry['data_store']['git_dir'] ref_name = _format_ref(tree_id, version_id) top_level_entries = self._git_call('ls-tree', - [ref_name]).splitlines() + [ref_name], + git_dir=git_dir).splitlines() if len(top_level_entries) == 1 and \ top_level_entries[0].endswith('\tdata'): blob_hash = top_level_entries[0].split('\t')[0].split(' ')[2] mime_type = metadata.get('mime_type', '') - return self._checkout_file(blob_hash, + return self._checkout_file(blob_hash, git_dir, suffix=_guess_extension(mime_type)) - return self._checkout_dir(ref_name) + return self._checkout_dir(ref_name, git_dir) def find(self, query_dict, options, query_string=None): logging.debug('find(%r, %r, %r)', query_dict, options, query_string) @@ -579,7 +593,7 @@ class InternalApi(object): return self._index.find_unique_values(name) def get_properties(self, object_id): - return self._index.retrieve(object_id) + return self._index.retrieve(object_id)['metadata'] def save(self, tree_id, parent_id, metadata, path, delete_after, async_cb, async_err_cb, allow_new_parent=False): @@ -676,15 +690,16 @@ class InternalApi(object): for entry in old_versions: self.delete((entry['tree_id'], entry['version_id'])) - def _checkout_file(self, blob_hash, suffix=''): + def _checkout_file(self, blob_hash, git_dir, suffix=''): fd, file_name = tempfile.mkstemp(dir=self._checkouts_dir, suffix=suffix) try: - self._git_call('cat-file', ['blob', blob_hash], stdout_fd=fd) + self._git_call('cat-file', ['blob', blob_hash], git_dir=git_dir, + stdout_fd=fd) finally: os.close(fd) return file_name - def _checkout_dir(self, ref_name): + def _checkout_dir(self, ref_name, git_dir): # FIXME return '' @@ -758,7 +773,8 @@ class InternalApi(object): return str(uuid.uuid4()) def _git_call(self, command, args=None, input=None, input_fd=None, - stdout_fd=None, work_dir=None, index_path=None): + stdout_fd=None, work_dir=None, index_path=None, + git_dir=None): env = dict(self._git_env) if work_dir: env['GIT_WORK_TREE'] = work_dir @@ -767,7 +783,7 @@ class InternalApi(object): logging.debug('calling git %s, env=%r', ['git', command] + (args or []), env) pipe = Popen(['git', command] + (args or []), stdin=input_fd or PIPE, stdout=stdout_fd or PIPE, stderr=PIPE, close_fds=True, - cwd=self._git_dir, env=env) + cwd=git_dir or self._git_dir, env=env) stdout, stderr = pipe.communicate(input) if pipe.returncode: raise GitError(pipe.returncode, stderr) diff --git a/gdatastore/index.py b/gdatastore/index.py index faa02c5..f60125f 100644 --- a/gdatastore/index.py +++ b/gdatastore/index.py @@ -219,9 +219,10 @@ class QueryParser(xapian.QueryParser): class Index(object): - def __init__(self, base_dir): - self._base_dir = base_dir + def __init__(self, data_stores): + self._data_stores = data_stores self._database = None + self._base_dir = data_stores[0]['index_dir'] if not os.path.exists(self._base_dir): os.makedirs(self._base_dir) @@ -248,6 +249,7 @@ class Index(object): return True def delete(self, object_id): + writable_db = self._get_writable_db() object_id_term = _object_id_term(object_id) if __debug__: enquire = Enquire(self._database) @@ -255,7 +257,9 @@ class Index(object): documents = [hit.document for hit in enquire.get_mset(0, 2, 2)] assert len(documents) == 1 - self._database.delete_document(object_id_term) + writable_db.delete_document(object_id_term) + writable_db.commit() + self._reopen_dbs() def find(self, query_dict, query_string, options): offset = options.pop('offset', 0) @@ -320,8 +324,14 @@ class Index(object): def retrieve(self, object_id): postings = self._database.postlist(_object_id_term(object_id)) - document = self._database.get_document(postings.next().docid) - return deserialise_metadata(document.get_data()) + doc_id = postings.next().docid + document = self._database.get_document(doc_id) + # When using multiple databases, Xapian document IDs are + # interleaved: + # global_doc_id = (local_doc_id - 1) * num_databases + db_index + 1 + ds_index = (doc_id - 1) % len(self._data_stores) + return {'metadata': deserialise_metadata(document.get_data()), + 'data_store': self._data_stores[ds_index]} def store(self, object_id, properties): logging.debug('store(%r, %r)', object_id, properties) @@ -334,7 +344,10 @@ class Index(object): term_generator = TermGenerator() term_generator.index_document(document, properties) assert (document.get_value(_VALUE_TREE_ID), document.get_value(_VALUE_VERSION_ID)) == object_id - self._database.replace_document(id_term, document) + writable_db = self._get_writable_db() + writable_db.replace_document(id_term, document) + writable_db.commit() + self._reopen_dbs() def _create_database(self): database = WritableDatabase(self._base_dir, xapian.DB_CREATE_OR_OPEN) @@ -359,6 +372,43 @@ class Index(object): raise DSIndexError('Unsupported index version: %d > %d' % (version, _CURRENT_VERSION)) + self._reopen_dbs() + + def _get_writable_db(self): + """Return a writable database instance + + In regular single-database mode, just return the single database + instance. In multi-database mode, return a new writable database + instance. + """ + if len(self._data_stores) == 1: + return self._database + return WritableDatabase(self._base_dir, xapian.DB_OPEN) + + def _reopen_dbs(self): + """Reopen databases if necessary + + In regular single-database, do nothing. In multi-database mode, + re-open all databases to ensure we use the latest versions of all + databases. + + Invoke this when either we have updated the primary database + ourselves or the additional databases may have been updated by + some other process. + """ + if len(self._data_stores) == 1: + return + self._database = xapian.Database(self._base_dir) + for db_info in self._data_stores[1:]: + db = xapian.Database(db_info['index_dir']) + version = int(db.get_metadata('gdatastore_version')) + if version != _CURRENT_VERSION: + logging.warning('Skipping extra index %r due to version' + ' mismatch', db_info['index_dir']) + continue + + self._database.add_database(db) + def deserialise_metadata(serialised): """Deserialise a string generated by serialise_metadata(). |