Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSascha Silbe <sascha@silbe.org>2009-08-17 11:30:12 (GMT)
committer Sascha Silbe <sascha@silbe.org>2009-08-17 11:30:12 (GMT)
commit0cedda85d11be006638e572c0ab2d4f570ec18e3 (patch)
tree5d674ae6a1d03c11ab3b70e8cf9874e64007e8b6
parent6b679e122c2a45e29b61287eef3c083822535f49 (diff)
version support single patch (6b679e122c2a45e29b61287eef3c083822535f49..dee44acbb76fd9e5b3226646bbf5c041f8890497)onlyversions-single
-rw-r--r--src/carquinyol/datastore.py322
-rw-r--r--src/carquinyol/filestore.py77
-rw-r--r--src/carquinyol/indexstore.py108
-rw-r--r--src/carquinyol/layoutmanager.py20
-rw-r--r--src/carquinyol/metadatareader.c24
-rw-r--r--src/carquinyol/metadatastore.py38
-rw-r--r--src/carquinyol/migration.py137
-rw-r--r--src/carquinyol/optimizer.py62
8 files changed, 431 insertions, 357 deletions
diff --git a/src/carquinyol/datastore.py b/src/carquinyol/datastore.py
index a118e03..86757aa 100644
--- a/src/carquinyol/datastore.py
+++ b/src/carquinyol/datastore.py
@@ -22,9 +22,12 @@ import os
import traceback
import dbus
+import dbus.service
import gobject
from sugar import mime
+import sugar.datastore
+from sugar.logger import trace
from carquinyol import layoutmanager
from carquinyol import migration
@@ -56,7 +59,12 @@ class DataStore(dbus.service.Object):
layout_manager = layoutmanager.get_instance()
if layout_manager.get_version() == 0:
migration.migrate_from_0()
- layout_manager.set_version(1)
+ layout_manager.set_version(2)
+ layout_manager.index_updated = False
+
+ if layout_manager.get_version() == 1:
+ migration.migrate_from_1()
+ layout_manager.set_version(2)
layout_manager.index_updated = False
self._metadata_store = MetadataStore()
@@ -80,120 +88,136 @@ class DataStore(dbus.service.Object):
self._optimizer = Optimizer(self._file_store, self._metadata_store)
def _rebuild_index(self):
- uids = layoutmanager.get_instance().find_all()
- logging.debug('Going to update the index with uids %r' % uids)
- gobject.idle_add(lambda: self.__rebuild_index_cb(uids),
+ tvids = layoutmanager.get_instance().find_all()
+ logging.debug('Going to update the index with tvids %r' % tvids)
+ gobject.idle_add(lambda: self.__rebuild_index_cb(tvids),
priority=gobject.PRIORITY_LOW)
- def __rebuild_index_cb(self, uids):
- if uids:
- uid = uids.pop()
+ def __rebuild_index_cb(self, tvids):
+ if tvids:
+ (tree_id,version_id) = tvids.pop()
- logging.debug('Updating entry %r in index. %d to go.' % \
- (uid, len(uids)))
+ logging.debug('Updating entry (%r,%r) in index. %d to go.' % \
+ (tree_id, version_id, len(tvids)))
- if not self._index_store.contains(uid):
+ if not self._index_store.contains(tree_id,version_id):
try:
- props = self._metadata_store.retrieve(uid)
- self._index_store.store(uid, props)
+ props = self._metadata_store.retrieve(tree_id,version_id)
+ self._index_store.store(tree_id, version_id, props)
except Exception:
- logging.error('Error processing %r\n%s.' \
- % (uid, traceback.format_exc()))
+ logging.error('Error processing (%r,%r)\n%s.' \
+ % (tree_id, version_id, traceback.format_exc()))
- if not uids:
+ if not tvids:
logging.debug('Finished updating index.')
layoutmanager.get_instance().index_updated = True
+ self.Ready()
return False
else:
return True
- def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None):
- logger.debug("_create_completion_cb(%r, %r, %r, %r)" % \
- (async_cb, async_err_cb, uid, exc))
- if exc is not None:
- async_err_cb(exc)
- return
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='ssa{sv}sb',
+ out_signature='ss',
+ byte_arrays=True)
+ @trace(logger=logger)
+ def save(self, tree_id, parent_id, metadata, path, delete_after):
+ return self._save(tree_id, parent_id, metadata, path, delete_after)
- self.Created(uid)
- self._optimizer.optimize(uid)
- logger.debug("created %s" % uid)
- async_cb(uid)
+ def _save(self, tree_id, parent_id, metadata, path, delete_after):
+ # TODO: copy docstring from datastore-redesign.html
+ if (not tree_id) and parent_id :
+ raise sugar.datastore.InvalidArgumentError("tree_id is empty but parent_id is not")
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='a{sv}sb',
- out_signature='s',
- async_callbacks=('async_cb', 'async_err_cb'),
- byte_arrays=True)
- def create(self, props, file_path, transfer_ownership,
- async_cb, async_err_cb):
- uid = str(uuid.uuid4())
- logging.debug('datastore.create %r' % uid)
-
- if not props.get('timestamp', ''):
- props['timestamp'] = int(time.time())
-
- self._metadata_store.store(uid, props)
- self._index_store.store(uid, props)
- self._file_store.store(uid, file_path, transfer_ownership,
- lambda *args: self._create_completion_cb(async_cb,
- async_err_cb,
- uid,
- *args))
-
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
- def Created(self, uid):
- pass
+ if tree_id and not parent_id :
+ if self._find({'tree_id': tree_id}, {'limit': 1})[1] :
+ raise sugar.datastore.InvalidArgumentError("no parent_id given but tree_id already exists")
+
+ elif parent_id :
+ if not self._find({'tree_id': tree_id, 'version_id': parent_id}, {'limit': 1})[1] :
+ raise sugar.datastore.InvalidArgumentError("given parent_id does not exist")
+
+ if not tree_id :
+ tree_id = self._gen_uuid()
- def _update_completion_cb(self, async_cb, async_err_cb, uid, exc=None):
- logger.debug("_update_completion_cb() called with %r / %r, exc %r" % \
- (async_cb, async_err_cb, exc))
+ child_id = self._gen_uuid()
+
+ # TODO: create branch if required
+
+ metadata['ctime'] = int(time.time())
+ metadata['tree_id'] = tree_id
+ metadata['parent_id'] = parent_id
+ metadata['version_id'] = child_id
+
+ self._metadata_store.store(tree_id, child_id, metadata)
+ # TODO: async
+ self._index_store.store(tree_id, child_id, metadata)
+
+ if (not path) and self._file_store.has_data(tree_id, parent_id) :
+ # metadata-only update, reuse parent data
+ path = self._file_store.retrieve(tree_id, parent_id, os.getuid(), "foo")
+
+ if path:
+ self._file_store.store(tree_id, child_id, path, delete_after,
+ lambda *args: self._save_completion_cb(tree_id, parent_id, child_id, *args))
+
+ return (tree_id, child_id)
+
+ @trace(logger=logger)
+ def _save_completion_cb(self, tree_id, parent_id, child_id, exc=None):
if exc is not None:
- async_err_cb(exc)
+ logging.error("Error during saving of entry (%r,%r,%r):\n%s" % (
+ tree_id, parent_id, child_id, traceback.format_exc(),))
+ # FIXME: what to do on error? for the async API we already guaranteed ACID
return
- self.Updated(uid)
- self._optimizer.optimize(uid)
- logger.debug("updated %s" % uid)
- async_cb()
+ self.Saved(tree_id, parent_id, child_id)
+ self._optimizer.optimize(tree_id, child_id)
+ logger.debug("updated %s %s" % (tree_id, child_id))
+
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="sss")
+ def Saved(self, tree_id, parent_id, child_id):
+ # TODO: copy docstring from datastore-redesign.html
+ pass
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='sa{sv}sb',
+ in_signature='ssa{sv}',
out_signature='',
- async_callbacks=('async_cb', 'async_err_cb'),
byte_arrays=True)
- def update(self, uid, props, file_path, transfer_ownership,
- async_cb, async_err_cb):
- logging.debug('datastore.update %r' % uid)
-
- if not props.get('timestamp', ''):
- props['timestamp'] = int(time.time())
-
- self._metadata_store.store(uid, props)
- self._index_store.store(uid, props)
-
- if os.path.exists(self._file_store.get_file_path(uid)) and \
- (not file_path or os.path.exists(file_path)):
- self._optimizer.remove(uid)
- self._file_store.store(uid, file_path, transfer_ownership,
- lambda *args: self._update_completion_cb(async_cb,
- async_err_cb,
- uid,
- *args))
-
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
- def Updated(self, uid):
+ @trace(logger=logger)
+ def change_metadata(self, tree_id, version_id, metadata) :
+ # TODO: copy docstring from datastore-redesign.html
+ self._metadata_store.store(tree_id, version_id, metadata)
+ self._index_store.store(tree_id, version_id, metadata)
+ self.ChangedMetadata(tree_id, version_id, metadata)
+
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="ssa{sv}")
+ def ChangedMetadata(self, tree_id, version_id, metadata):
pass
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='a{sv}as',
- out_signature='aa{sv}u')
- def find(self, query, properties):
- logging.debug('datastore.find %r' % query)
+ in_signature='a{sv}a{sv}',
+ out_signature='aa{sv}u',
+ byte_arrays=True)
+ @trace(logger=logger)
+ def find(self, query, options):
+ return self._find(query, options)
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='sa{sv}a{sv}',
+ out_signature='aa{sv}u',
+ byte_arrays=True)
+ @trace(logger=logger)
+ def textsearch(self, querystring, query, options) :
+ return self._find(query, options, querystring)
+
+ def _find(self, query, options, querystring = None) :
t = time.time()
+ tvids = None
if layoutmanager.get_instance().index_updated:
try:
- uids, count = self._index_store.find(query)
+ tvids, count = self._index_store.find(query, querystring, options)
except Exception:
logging.error('Failed to query index, will rebuild\n%s' \
% traceback.format_exc())
@@ -206,76 +230,91 @@ class DataStore(dbus.service.Object):
if not layoutmanager.get_instance().index_updated:
logging.warning('Index updating, returning all entries')
- uids = layoutmanager.get_instance().find_all()
- count = len(uids)
+ tvids = layoutmanager.get_instance().find_all()
+ if not options.get('all_versions', False) :
+ # only return latest version for each entry
+ tids_vtime = {}
+ for (tree_id, version_id) in tvids :
+ tids_vtime.setdefault(tree_id, []).append((version_id, self._metadata_store.retrieve(tree_id, version_id, 'timestamp')))
+
+ tvids = [(tree_id, sorted(candidates, key=lambda e: e[1], reverse=True)[0][0])
+ for (tree_id, candidates) in tids_vtime.items()]
+
+ count = len(tvids)
- offset = query.get('offset', 0)
- limit = query.get('limit', MAX_QUERY_LIMIT)
- uids = uids[offset:offset + limit]
+ offset = options.get('offset', 0)
+ limit = options.get('limit', MAX_QUERY_LIMIT)
+ tvids = tvids[offset:offset + limit]
+# logger.debug('tvids=%r' % (tvids,))
entries = []
- for uid in uids:
- metadata = self._metadata_store.retrieve(uid, properties)
+ for (tree_id,version_id) in tvids:
+ metadata = self._metadata_store.retrieve(tree_id, version_id, options.get('metadata'))
entries.append(metadata)
- logger.debug('find(): %r' % (time.time() - t))
+
return entries, count
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
+ in_signature='ss',
out_signature='s',
- sender_keyword='sender')
- def get_filename(self, uid, sender=None):
- logging.debug('datastore.get_filename %r' % uid)
+ sender_keyword='sender',
+ byte_arrays=True)
+ @trace(logger=logger)
+ def get_data(self, tree_id, version_id, sender=None):
+ # TODO: copy docstring from datastore-redesign.html
user_id = dbus.Bus().get_unix_user(sender)
- extension = self._get_extension(uid)
- return self._file_store.retrieve(uid, user_id, extension)
+ extension = self._get_extension(tree_id, version_id)
+ # TODO: async
+ path = self._file_store.retrieve(tree_id, version_id, user_id, extension)
+ self.GotData(sender, tree_id, version_id, path)
+ return path
+
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="ssss")
+ def GotData(self, sender, tree_id, version_id, path) :
+ pass
- def _get_extension(self, uid):
- mime_type = self._metadata_store.get_property(uid, 'mime_type')
+ def _get_extension(self, tree_id, version_id):
+ mime_type = self._metadata_store.get_property(tree_id, version_id, 'mime_type')
if mime_type is None or not mime_type:
return ''
return mime.get_primary_extension(mime_type)
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
- out_signature='a{sv}')
- def get_properties(self, uid):
- logging.debug('datastore.get_properties %r' % uid)
- metadata = self._metadata_store.retrieve(uid)
- return metadata
-
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='sa{sv}',
+ in_signature='a{sv}s',
out_signature='as')
- def get_uniquevaluesfor(self, propertyname, query=None):
- if propertyname != 'activity':
- raise ValueError('Only ''activity'' is a supported property name')
+ def find_unique_values(self, query, metadata_name):
+ # TODO: copy docstring
+ # TODO: support for arbitrary metadata names and query
+ if metadata_name != 'bundle_id':
+ raise ValueError('Only ''bundle_id'' is a supported property name')
if query:
raise ValueError('The query parameter is not supported')
if layoutmanager.get_instance().index_updated:
- return self._index_store.get_activities()
+ return self._index_store.get_bundle_ids()
else:
logging.warning('Index updating, returning an empty list')
return []
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
- out_signature='')
- def delete(self, uid):
- self._optimizer.remove(uid)
-
- self._index_store.delete(uid)
- self._file_store.delete(uid)
- self._metadata_store.delete(uid)
+ in_signature='ss',
+ out_signature='',
+ byte_arrays=True)
+ def delete(self, tree_id, version_id):
+ # TODO: version_id=None/'' => remove all versions
+ self._optimizer.remove(tree_id, version_id)
+
+ self._index_store.delete(tree_id, version_id)
+ self._file_store.delete(tree_id, version_id)
+ self._metadata_store.delete(tree_id, version_id)
- entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ entry_path = layoutmanager.get_instance().get_entry_path(tree_id, version_id)
os.removedirs(entry_path)
- self.Deleted(uid)
- logger.debug("deleted %s" % uid)
+ self.Deleted(tree_id, version_id)
+ logger.debug("deleted (%r,%r)" % (tree_id,version_id))
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
- def Deleted(self, uid):
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="ss")
+ def Deleted(self, tree_id, version_id):
pass
def stop(self):
@@ -288,28 +327,15 @@ class DataStore(dbus.service.Object):
pass
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature="sa{sv}",
- out_signature='s')
- def mount(self, uri, options=None):
- return ''
-
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature="",
- out_signature="aa{sv}")
- def mounts(self):
- return [{'id': 1}]
+ in_signature='',
+ out_signature='b')
+ def check_ready(self):
+ return layoutmanager.get_instance().index_updated
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature="s",
- out_signature="")
- def unmount(self, mountpoint_id):
- pass
-
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
- def Mounted(self, descriptior):
+ @dbus.service.signal(DS_DBUS_INTERFACE)
+ def Ready(self):
pass
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
- def Unmounted(self, descriptor):
- pass
+ def _gen_uuid(self) :
+ return str(uuid.uuid4())
diff --git a/src/carquinyol/filestore.py b/src/carquinyol/filestore.py
index f88c531..eebec8c 100644
--- a/src/carquinyol/filestore.py
+++ b/src/carquinyol/filestore.py
@@ -29,15 +29,15 @@ class FileStore(object):
# TODO: add protection against store and retrieve operations on entries
# that are being processed async.
- def store(self, uid, file_path, transfer_ownership, completion_cb):
+ def store(self, tree_id, version_id, file_path, transfer_ownership, completion_cb):
"""Store a file for a given entry.
"""
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ dir_path = layoutmanager.get_instance().get_entry_path(tree_id, version_id)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
- destination_path = os.path.join(dir_path, 'data')
+ destination_path = layoutmanager.get_instance().get_data_path(tree_id, version_id)
if file_path:
if not os.path.isfile(file_path):
raise ValueError('No file at %r' % file_path)
@@ -75,15 +75,20 @@ class FileStore(object):
async_copy = AsyncCopy(file_path, destination_path, completion_cb)
async_copy.start()
- def retrieve(self, uid, user_id, extension):
+ def has_data(self, tree_id, version_id) :
+ dir_path = layoutmanager.get_instance().get_entry_path(tree_id, version_id)
+ file_path = os.path.join(dir_path, 'data')
+ return os.path.exists(file_path)
+
+ def retrieve(self, tree_id, version_id, user_id, extension):
"""Place the file associated to a given entry into a directory where the
user can read it. The caller is reponsible for deleting this file.
"""
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
- file_path = os.path.join(dir_path, 'data')
+ dir_path = layoutmanager.get_instance().get_entry_path(tree_id, version_id)
+ file_path = layoutmanager.get_instance().get_data_path(tree_id, version_id)
if not os.path.exists(file_path):
- logging.debug('Entry %r doesnt have any file' % uid)
+ logging.debug('Entry (%r,%r) doesnt have any file' % (tree_id,version_id))
return ''
use_instance_dir = os.path.exists('/etc/olpc-security') and \
@@ -105,21 +110,13 @@ class FileStore(object):
elif extension:
extension = '.' + extension
- destination_path = os.path.join(destination_dir, uid + extension)
-
- attempt = 1
- while os.path.exists(destination_path):
- if attempt > 10:
- fd_, destination_path = tempfile.mkstemp(prefix=uid,
- suffix=extension,
- dir=destination_dir)
- del fd_
- os.unlink(destination_path)
- break
- else:
- file_name = '%s_%s%s' % (uid, attempt, extension)
- destination_path = os.path.join(destination_dir, file_name)
- attempt += 1
+ base_name = "%s-%s" % (tree_id, version_id)
+ fd_, destination_path = tempfile.mkstemp(prefix=base_name,
+ suffix=extension,
+ dir=destination_dir)
+ del fd_
+ # FIXME
+ os.unlink(destination_path)
# Try to hard link from the original file to the targetpath. This can
# fail if the file is in a different filesystem. Do a symlink instead.
@@ -128,39 +125,33 @@ class FileStore(object):
except OSError, e:
if e.errno == errno.EXDEV:
os.symlink(file_path, destination_path)
- else:
- raise
- # Try to make the original file readable. This can fail if the file is
- # in a FAT filesystem.
- try:
- os.chmod(file_path, 0604)
- except OSError, e:
- if e.errno != errno.EPERM:
+ # Try to make the original file readable. This can fail if the file is
+ # in a FAT filesystem.
+ try:
+ os.chmod(file_path, 0444)
+ except OSError, e:
+ if e.errno != errno.EPERM:
+ raise
+ else:
raise
return destination_path
- def get_file_path(self, uid):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
- return os.path.join(dir_path, 'data')
+ def get_file_path(self, tree_id, version_id):
+ return layoutmanager.get_instance().get_data_path(tree_id, version_id)
- def delete(self, uid):
+ def delete(self, tree_id, version_id):
"""Remove the file associated to a given entry.
"""
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
- file_path = os.path.join(dir_path, 'data')
+ file_path = layoutmanager.get_instance().get_data_path(tree_id, version_id)
if os.path.exists(file_path):
os.remove(file_path)
- def hard_link_entry(self, new_uid, existing_uid):
- existing_file = os.path.join(
- layoutmanager.get_instance().get_entry_path(existing_uid),
- 'data')
- new_file = os.path.join(
- layoutmanager.get_instance().get_entry_path(new_uid),
- 'data')
+ def hard_link_entry(self, new_tree_id, new_version_id, existing_tree_id, existing_version_id):
+ existing_file = layoutmanager.get_instance().get_data_path(existing_tree_id, existing_version_id)
+ new_file = layoutmanager.get_instance().get_data_path(new_tree_id, new_version_id)
logging.debug('removing %r' % new_file)
os.remove(new_file)
diff --git a/src/carquinyol/indexstore.py b/src/carquinyol/indexstore.py
index 62aebb4..a8061a3 100644
--- a/src/carquinyol/indexstore.py
+++ b/src/carquinyol/indexstore.py
@@ -24,11 +24,13 @@ from xapian import WritableDatabase, Document, Enquire, Query, QueryParser
from carquinyol import layoutmanager
from carquinyol.layoutmanager import MAX_QUERY_LIMIT
-_VALUE_UID = 0
+_VALUE_TID = 0
_VALUE_TIMESTAMP = 1
+_VALUE_VID = 2
-_PREFIX_UID = 'Q'
-_PREFIX_ACTIVITY = 'A'
+_PREFIX_TID = 'Q'
+_PREFIX_VID = 'V'
+_PREFIX_BUNDLE_ID = 'A'
_PREFIX_ACTIVITY_ID = 'I'
_PREFIX_MIME_TYPE = 'M'
_PREFIX_KEEP = 'K'
@@ -66,24 +68,28 @@ class IndexStore(object):
for f in os.listdir(index_path):
os.remove(os.path.join(index_path, f))
- def contains(self, uid):
- postings = self._database.postlist(_PREFIX_UID + uid)
+ def contains(self, tree_id, version_id):
+ postings = self._database.postlist(_PREFIX_TID + tree_id + _PREFIX_VID + version_id)
try:
postlist_item = postings.next()
except StopIteration:
return False
return True
- def store(self, uid, properties):
+ def store(self, tree_id, version_id, properties):
document = Document()
- document.add_term(_PREFIX_UID + uid)
- document.add_term(_PREFIX_ACTIVITY + properties.get('activity', ''))
+ docid = "%s%s%s%s" % (_PREFIX_TID, tree_id, _PREFIX_VID, version_id)
+ document.add_term(docid)
+ document.add_term(_PREFIX_TID + tree_id)
+ document.add_term(_PREFIX_VID + version_id)
+ document.add_term(_PREFIX_BUNDLE_ID + properties.get('bundle_id', ''))
document.add_term(_PREFIX_MIME_TYPE + properties.get('mime_type', ''))
document.add_term(_PREFIX_ACTIVITY_ID +
properties.get('activity_id', ''))
document.add_term(_PREFIX_KEEP + str(properties.get('keep', 0)))
- document.add_value(_VALUE_UID, uid)
+ document.add_value(_VALUE_TID, tree_id)
+ document.add_value(_VALUE_VID, version_id)
document.add_value(_VALUE_TIMESTAMP, str(properties['timestamp']))
term_generator = xapian.TermGenerator()
@@ -103,10 +109,10 @@ class IndexStore(object):
term_generator.index_text_without_positions(
self._extract_text(properties), 1, '')
- if not self.contains(uid):
+ if not self.contains(tree_id, version_id):
self._database.add_document(document)
else:
- self._database.replace_document(_PREFIX_UID + uid, document)
+ self._database.replace_document(docid, document)
self._flush()
def _extract_text(self, properties):
@@ -122,33 +128,45 @@ class IndexStore(object):
text += value
return text
- def find(self, query):
+ def find(self, query, querystring, options):
enquire = Enquire(self._database)
- enquire.set_query(self._parse_query(query))
- offset = query.get('offset', 0)
- limit = query.get('limit', MAX_QUERY_LIMIT)
+ offset = options.pop('offset', 0)
+ limit = options.pop('limit', MAX_QUERY_LIMIT)
+ all_versions = options.pop('all_versions', False)
+
+ enquire.set_query(self._parse_query(query, querystring))
# This will assure that the results count is exact.
check_at_least = offset + limit + 1
enquire.set_sort_by_value(_VALUE_TIMESTAMP, True)
+ if not all_versions :
+ # only select newest entry (sort order) for each tree_id
+ enquire.set_collapse_key(_VALUE_TID)
+
query_result = enquire.get_mset(offset, limit, check_at_least)
total_count = query_result.get_matches_estimated()
- uids = []
- for hit in query_result:
- uids.append(hit.document.get_value(_VALUE_UID))
-
- return (uids, total_count)
-
- def _parse_query(self, query_dict):
- logging.debug('_parse_query %r' % query_dict)
+ tvids = [(hit.document.get_value(_VALUE_TID),
+ hit.document.get_value(_VALUE_VID))
+ for hit in query_result]
+
+ return (tvids, total_count)
+
+ _queryTermMap = {
+ 'tree_id': _PREFIX_TID,
+ 'version_id': _PREFIX_VID,
+ 'bundle_id': _PREFIX_BUNDLE_ID,
+ 'activity_id': _PREFIX_ACTIVITY_ID,
+ 'keep': _PREFIX_KEEP,
+ }
+ def _parse_query(self, query_dict, query_str):
+ logging.debug('_parse_query %r %r', query_dict, query_str)
queries = []
- query_str = query_dict.pop('query', None)
- if query_str is not None:
+ if query_str:
query_parser = QueryParser()
query_parser.set_database(self._database)
#query_parser.set_default_op(Query.OP_AND)
@@ -167,6 +185,12 @@ class IndexStore(object):
queries.append(query)
+ # metadata -> term for simple datatypes (string, bool)
+ for (m_name, term_prefix) in self._queryTermMap.items() :
+ m_value = query_dict.pop(m_name, None)
+ if m_value is not None:
+ queries.append(Query(term_prefix+str(m_value)))
+
timestamp = query_dict.pop('timestamp', None)
if timestamp is not None:
start = str(timestamp.pop('start', 0))
@@ -174,24 +198,6 @@ class IndexStore(object):
query = Query(Query.OP_VALUE_RANGE, _VALUE_TIMESTAMP, start, end)
queries.append(query)
- uid = query_dict.pop('uid', None)
- if uid is not None:
- queries.append(Query(_PREFIX_UID + uid))
-
- activity = query_dict.pop('activity', None)
- if activity is not None:
- queries.append(Query(_PREFIX_ACTIVITY + activity))
-
- activity_id = query_dict.pop('activity_id', None)
- if activity_id is not None:
- query = Query(_PREFIX_ACTIVITY_ID + activity_id)
- queries.append(query)
-
- keep = query_dict.pop('keep', None)
- if keep is not None:
- query = Query(_PREFIX_KEEP + str(keep))
- queries.append(query)
-
mime_type = query_dict.pop('mime_type', None)
if mime_type is not None:
mime_queries = []
@@ -203,18 +209,18 @@ class IndexStore(object):
queries.append(Query(''))
if query_dict:
- logging.warning('Unknown term(s): %r' % query_dict)
+ logging.warning('Unknown term(s): %r', query_dict)
return Query(Query.OP_AND, queries)
- def delete(self, uid):
- self._database.delete_document(_PREFIX_UID + uid)
+ def delete(self, tree_id, version_id):
+ self._database.delete_document("%s%s%s%s" % (_PREFIX_TID, tree_id, _PREFIX_VID, version_id))
- def get_activities(self):
- activities = []
- for term in self._database.allterms(_PREFIX_ACTIVITY):
- activities.append(term.term[len(_PREFIX_ACTIVITY):])
- return activities
+ def get_bundle_ids(self):
+ bundle_ids = []
+ for term in self._database.allterms(_PREFIX_BUNDLE_ID):
+ bundle_ids.append(term.term[len(_PREFIX_BUNDLE_ID):])
+ return bundle_ids
def _flush_timeout_cb(self):
self._flush(True)
diff --git a/src/carquinyol/layoutmanager.py b/src/carquinyol/layoutmanager.py
index dc3fde6..8be2a11 100644
--- a/src/carquinyol/layoutmanager.py
+++ b/src/carquinyol/layoutmanager.py
@@ -53,9 +53,17 @@ class LayoutManager(object):
version_path = os.path.join(self._root_path, 'version')
open(version_path, 'w').write(str(version))
- def get_entry_path(self, uid):
+ def get_entry_path(self, tree_id, version_id):
# os.path.join() is just too slow
- return '%s/%s/%s' % (self._root_path, uid[:2], uid)
+ return '%s/%s/%s-%s' % (self._root_path, tree_id[:2], tree_id, version_id)
+
+ def get_data_path(self, tree_id, version_id) :
+ # os.path.join() is just too slow
+ return '%s/%s/%s-%s/data' % (self._root_path, tree_id[:2], tree_id, version_id)
+
+ def get_metadata_path(self, tree_id, version_id) :
+ # os.path.join() is just too slow
+ return '%s/%s/%s-%s/metadata' % (self._root_path, tree_id[:2], tree_id, version_id)
def get_root_path(self):
return self._root_path
@@ -85,13 +93,13 @@ class LayoutManager(object):
index_updated = property(_is_index_updated, _set_index_updated)
def find_all(self):
- uids = []
+ tvids = []
for f in os.listdir(self._root_path):
if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2:
for g in os.listdir(os.path.join(self._root_path, f)):
- if len(g) == 36:
- uids.append(g)
- return uids
+ if len(g) == 73:
+ tvids.append((g[:36], g[37:]))
+ return tvids
_instance = None
def get_instance():
diff --git a/src/carquinyol/metadatareader.c b/src/carquinyol/metadatareader.c
index 08be17e..edc338a 100644
--- a/src/carquinyol/metadatareader.c
+++ b/src/carquinyol/metadatareader.c
@@ -8,7 +8,7 @@
static PyObject *byte_array_type = NULL;
int
-add_property(char *metadata_path, char *property_name, PyObject *dict,
+add_property(const char *metadata_path, char *property_name, PyObject *dict,
int must_exist)
{
int file_path_size;
@@ -125,7 +125,7 @@ cleanup:
}
static PyObject *
-read_from_properties_list (char *metadata_path, PyObject *properties)
+read_from_properties_list (const char *metadata_path, PyObject *properties)
{
PyObject *dict = PyDict_New();
@@ -148,7 +148,7 @@ cleanup:
}
static PyObject *
-read_all_properties (char *metadata_path)
+read_all_properties (const char *metadata_path)
{
PyObject *dict = PyDict_New();
DIR *dir_stream = NULL;
@@ -198,34 +198,22 @@ metadatareader_retrieve(PyObject *unused, PyObject *args)
{
PyObject *dict = NULL;
PyObject *properties = NULL;
- const char *dir_path = NULL;
- char *metadata_path = NULL;
+ const char *metadata_path = NULL;
- if (!PyArg_ParseTuple(args, "sO:retrieve", &dir_path, &properties))
+ if (!PyArg_ParseTuple(args, "sO:retrieve", &metadata_path, &properties))
return NULL;
- // Build path to the metadata directory
- int metadata_path_size = strlen(dir_path) + 10;
- metadata_path = PyMem_Malloc(metadata_path_size);
- if (metadata_path == NULL) {
- PyErr_NoMemory();
- return NULL;
- }
- snprintf (metadata_path, metadata_path_size, "%s/%s", dir_path, "metadata");
-
if ((properties != Py_None) && (PyList_Size(properties) > 0)) {
dict = read_from_properties_list(metadata_path, properties);
} else {
dict = read_all_properties(metadata_path);
}
- PyMem_Free(metadata_path);
-
return dict;
}
static PyMethodDef metadatareader_functions[] = {
- {"retrieve", metadatareader_retrieve, METH_VARARGS, PyDoc_STR("Read a dictionary from a file")},
+ {"retrieve", metadatareader_retrieve, METH_VARARGS, PyDoc_STR("Read a dictionary from a directory with a single file (containing the content) per key")},
{NULL, NULL, 0, NULL}
};
diff --git a/src/carquinyol/metadatastore.py b/src/carquinyol/metadatastore.py
index 50981f3..ffec4a8 100644
--- a/src/carquinyol/metadatastore.py
+++ b/src/carquinyol/metadatastore.py
@@ -6,19 +6,17 @@ from carquinyol import metadatareader
MAX_SIZE = 256
class MetadataStore(object):
- def store(self, uid, metadata):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
- if not os.path.exists(dir_path):
- os.makedirs(dir_path)
-
- metadata_path = os.path.join(dir_path, 'metadata')
+ def store(self, tree_id, version_id, metadata):
+ metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id)
if not os.path.exists(metadata_path):
os.makedirs(metadata_path)
else:
for key in os.listdir(metadata_path):
- os.remove(os.path.join(metadata_path, key))
+ if key not in metadata :
+ os.remove(os.path.join(metadata_path, key))
- metadata['uid'] = uid
+ metadata['tree_id'] = tree_id
+ metadata['version_id'] = version_id
for key, value in metadata.items():
# Hack to support activities that still pass properties named as for
@@ -36,29 +34,29 @@ class MetadataStore(object):
finally:
f.close()
- def retrieve(self, uid, properties=None):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
- return metadatareader.retrieve(dir_path, properties)
+ def retrieve(self, tree_id, version_id, properties=None):
+ metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id)
+ if not os.path.exists(metadata_path) :
+ return None
+
+ return metadatareader.retrieve(metadata_path, properties)
- def delete(self, uid):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
- metadata_path = os.path.join(dir_path, 'metadata')
+ def delete(self, tree_id, version_id):
+ metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id)
for key in os.listdir(metadata_path):
os.remove(os.path.join(metadata_path, key))
os.rmdir(metadata_path)
- def get_property(self, uid, key):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
- metadata_path = os.path.join(dir_path, 'metadata')
+ def get_property(self, tree_id, version_id, key):
+ metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id)
property_path = os.path.join(metadata_path, key)
if os.path.exists(property_path):
return open(property_path, 'r').read()
else:
return None
- def set_property(self, uid, key, value):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
- metadata_path = os.path.join(dir_path, 'metadata')
+ def set_property(self, tree_id, version_id, key, value):
+ metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id)
property_path = os.path.join(metadata_path, key)
open(property_path, 'w').write(value)
diff --git a/src/carquinyol/migration.py b/src/carquinyol/migration.py
index 228db2a..d979b5e 100644
--- a/src/carquinyol/migration.py
+++ b/src/carquinyol/migration.py
@@ -22,15 +22,19 @@ import logging
import shutil
import time
import traceback
+import uuid
import cjson
-from carquinyol import layoutmanager
+from carquinyol import layoutmanager, metadatastore
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
+mstore = metadatastore.MetadataStore()
+
+
def migrate_from_0():
- logging.info('Migrating datastore from version 0 to version 1')
+ logging.info('Migrating datastore from version 0 to version 2')
root_path = layoutmanager.get_instance().get_root_path()
old_root_path = os.path.join(root_path, 'store')
@@ -38,18 +42,19 @@ def migrate_from_0():
return
for f in os.listdir(old_root_path):
- uid, ext = os.path.splitext(f)
+ tree_id, ext = os.path.splitext(f)
if ext != '.metadata':
continue
- logging.debug('Migrating entry %r' % uid)
+ logging.debug('Migrating entry %r' % tree_id)
+ version_id = str(uuid.uuid4())
try:
- _migrate_metadata(root_path, old_root_path, uid)
- _migrate_file(root_path, old_root_path, uid)
- _migrate_preview(root_path, old_root_path, uid)
+ _migrate_metadata_0(root_path, old_root_path, tree_id, version_id)
+ _migrate_file_0(root_path, old_root_path, tree_id, version_id)
+ _migrate_preview_0(root_path, old_root_path, tree_id, version_id)
except Exception:
logging.error('Error while migrating entry %r: %s\n' % \
- (uid, traceback.format_exc()))
+ (tree_id, traceback.format_exc()))
# Just be paranoid, it's cheap.
if old_root_path.endswith('datastore/store'):
@@ -57,46 +62,94 @@ def migrate_from_0():
logging.info('Migration finished')
-def _migrate_metadata(root_path, old_root_path, uid):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
- metadata_path = os.path.join(dir_path, 'metadata')
- os.makedirs(metadata_path)
+def _migrate_metadata_0(root_path, old_root_path, tree_id, version_id):
+ dir_path = layoutmanager.get_instance().get_entry_path(tree_id, version_id)
+ metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id)
- old_metadata_path = os.path.join(old_root_path, uid + '.metadata')
+ old_metadata_path = os.path.join(old_root_path, tree_id + '.metadata')
metadata = cjson.decode(open(old_metadata_path, 'r').read())
- if 'uid' not in metadata:
- metadata['uid'] = uid
+ metadata['tree_id'] = tree_id
+ metadata['version_id'] = version_id
+ metadata['parent_id'] = ''
if 'timestamp' not in metadata and 'mtime' in metadata:
- metadata['timestamp'] = \
+ metadata['ctime'] = \
time.mktime(time.strptime(metadata['mtime'], DATE_FORMAT))
+ elif 'timestamp' in metadata :
+ metadata['ctime'] = metadata.pop('timestamp')
+ else :
+ metadata['ctime'] = os.stat(old_metadata_path).st_ctime
+
+ try:
+ mstore.store(tree_id, version_id, metadata)
+
+ except:
+ logging.error(
+ 'Error while migrating property entry %s: %s\n' % \
+ (tree_id, traceback.format_exc()))
+
+def _migrate_file_0(root_path, old_root_path, tree_id, version_id):
+ if os.path.exists(os.path.join(old_root_path, tree_id)):
+ new_data_path = layoutmanager.get_instance().get_data_path(tree_id, version_id)
+ os.rename(os.path.join(old_root_path, tree_id),
+ new_data_path)
+
+def _migrate_preview_0(root_path, old_root_path, tree_id, version_id):
+ metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id)
+ os.rename(os.path.join(old_root_path, 'preview', tree_id),
+ os.path.join(metadata_path, 'preview'))
- for key, value in metadata.items():
- try:
- f = open(os.path.join(metadata_path, key), 'w')
+
+def migrate_from_1():
+ logging.info('Migrating datastore from version 1 to version 2')
+
+ root_path = layoutmanager.get_instance().get_root_path()
+ checksum_path = layoutmanager.get_instance().get_checksums_dir()
+
+ version_ids = {}
+ for hash02 in os.listdir(root_path):
+ if len(hash02) != 2 :
+ continue
+
+ for tree_id in os.listdir(os.path.join(root_path, hash02)) :
+ if (len(tree_id) != 36) :
+ continue
+
+ logging.debug('Migrating entry %r' % tree_id)
+
+ version_id = str(uuid.uuid4())
+ version_ids[tree_id] = version_id
try:
- if isinstance(value, unicode):
- value = value.encode('utf-8')
- if not isinstance(value, basestring):
- value = str(value)
- f.write(value)
- finally:
- f.close()
- except Exception:
- logging.error(
- 'Error while migrating property %s of entry %s: %s\n' % \
- (key, uid, traceback.format_exc()))
-
-def _migrate_file(root_path, old_root_path, uid):
- if os.path.exists(os.path.join(old_root_path, uid)):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
- os.rename(os.path.join(old_root_path, uid),
- os.path.join(dir_path, 'data'))
-
-def _migrate_preview(root_path, old_root_path, uid):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
- metadata_path = os.path.join(dir_path, 'metadata')
- os.rename(os.path.join(old_root_path, 'preview', uid),
- os.path.join(metadata_path, 'preview'))
+ new_path = layoutmanager.get_instance().get_entry_path(tree_id, version_id)
+ new_metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id)
+ os.rename(os.path.join(root_path, hash02, tree_id),
+ new_path)
+ metadata = mstore.retrieve(tree_id, version_id)
+ metadata['tree_id'] = tree_id
+ metadata['version_id'] = version_id
+ metadata['parent_id'] = ''
+ metadata['ctime'] = metadata.get('timestamp', os.stat(new_path).st_ctime)
+ metadata.pop('uid')
+ mstore.store(tree_id, version_id, metadata)
+
+ except Exception:
+ logging.error('Error while migrating entry %r: %s\n' % \
+ (tree_id, traceback.format_exc()))
+
+ for checksum in os.listdir(checksum_path) :
+ entries_path = os.path.join(checksum_path, checksum)
+ for tree_id in os.listdir(entries_path) :
+ if len(tree_id) != 36 :
+ continue
+
+ try :
+ os.rename(os.path.join(entries_path, tree_id),
+ os.path.join(entries_path, "%s-%s" % (tree_id, version_ids[tree_id])))
+
+ except Exception:
+ logging.error('Error while migrating checksum entry %r / %r: %s\n' % \
+ (checksum, tree_id, traceback.format_exc()))
+
+ logging.info('Migration finished')
diff --git a/src/carquinyol/optimizer.py b/src/carquinyol/optimizer.py
index f8a2e3e..19849e6 100644
--- a/src/carquinyol/optimizer.py
+++ b/src/carquinyol/optimizer.py
@@ -18,11 +18,13 @@ import os
import errno
import logging
import subprocess
+import uuid
import gobject
from carquinyol import layoutmanager
+# TODO: use layoutmanager for entries in 'checksums' directory
class Optimizer(object):
"""Optimizes disk space usage by detecting duplicates and sharing storage.
"""
@@ -31,33 +33,34 @@ class Optimizer(object):
self._metadata_store = metadata_store
self._enqueue_checksum_id = None
- def optimize(self, uid):
+ def optimize(self, tree_id, version_id):
"""Add an entry to a queue of entries to be checked for duplicates.
"""
- if not os.path.exists(self._file_store.get_file_path(uid)):
+ if not os.path.exists(self._file_store.get_file_path(tree_id, version_id)):
return
queue_path = layoutmanager.get_instance().get_queue_path()
- open(os.path.join(queue_path, uid), 'w').close()
- logging.debug('optimize %r' % os.path.join(queue_path, uid))
+ fname = os.path.join(queue_path, "%s-%s" % (tree_id, version_id))
+ open(fname, 'w').close()
+ logging.debug('optimize %r' % fname)
if self._enqueue_checksum_id is None:
self._enqueue_checksum_id = \
gobject.idle_add(self._process_entry_cb,
priority=gobject.PRIORITY_LOW)
- def remove(self, uid):
+ def remove(self, tree_id, version_id):
"""Remove any structures left from space optimization
"""
- checksum = self._metadata_store.get_property(uid, 'checksum')
+ checksum = self._metadata_store.get_property(tree_id, version_id, 'checksum')
if checksum is None:
return
checksums_dir = layoutmanager.get_instance().get_checksums_dir()
checksum_path = os.path.join(checksums_dir, checksum)
- checksum_entry_path = os.path.join(checksum_path, uid)
+ checksum_entry_path = os.path.join(checksum_path, "%s-%s" % (tree_id,version_id))
if os.path.exists(checksum_entry_path):
logging.debug('remove %r' % checksum_entry_path)
@@ -79,14 +82,14 @@ class Optimizer(object):
checksum_path = os.path.join(checksums_dir, checksum)
return os.path.exists(checksum_path)
- def _get_uid_from_checksum(self, checksum):
+ def _get_uvid_from_checksum(self, checksum):
"""Get an existing entry which file matches checksum.
"""
checksums_dir = layoutmanager.get_instance().get_checksums_dir()
checksum_path = os.path.join(checksums_dir, checksum)
- first_uid = os.listdir(checksum_path)[0]
- return first_uid
+ first_uvid = os.listdir(checksum_path)[0]
+ return (first_uvid[:36], first_uvid[37:])
def _create_checksum_dir(self, checksum):
"""Create directory that tracks files with this same checksum.
@@ -97,24 +100,25 @@ class Optimizer(object):
logging.debug('create dir %r' % checksum_path)
os.mkdir(checksum_path)
- def _add_checksum_entry(self, uid, checksum):
- """Create a file in the checksum dir with the uid of the entry
+ def _add_checksum_entry(self, tree_id, version_id, checksum):
+ """Create a file in the checksum dir with the tree_id and version_id of the entry
"""
checksums_dir = layoutmanager.get_instance().get_checksums_dir()
checksum_path = os.path.join(checksums_dir, checksum)
- logging.debug('touch %r' % os.path.join(checksum_path, uid))
- open(os.path.join(checksum_path, uid), 'w').close()
+ fname = os.path.join(checksum_path, "%s-%s" % (tree_id,version_id))
+ logging.debug('touch %r' % fname)
+ open(fname, 'w').close()
- def _already_linked(self, uid, checksum):
+ def _already_linked(self, tree_id, version_id, checksum):
"""Check if this entry's file is already a hard link to the checksums
dir.
"""
checksums_dir = layoutmanager.get_instance().get_checksums_dir()
checksum_path = os.path.join(checksums_dir, checksum)
- return os.path.exists(os.path.join(checksum_path, uid))
+ return os.path.exists(os.path.join(checksum_path, "%s-%s" % (tree_id,version_id)))
def _process_entry_cb(self):
"""Process one item in the checksums queue by calculating its checksum,
@@ -125,30 +129,30 @@ class Optimizer(object):
queue_path = layoutmanager.get_instance().get_queue_path()
queue = os.listdir(queue_path)
if queue:
- uid = queue[0]
- logging.debug('_process_entry_cb processing %r' % uid)
+ (tree_id,version_id) = queue[0][:36], queue[0][37:]
+ logging.debug('_process_entry_cb processing (%r,%r)' % (tree_id,version_id))
- file_in_entry_path = self._file_store.get_file_path(uid)
+ file_in_entry_path = self._file_store.get_file_path(tree_id,version_id)
if not os.path.exists(file_in_entry_path):
- logging.info('non-existent entry in queue: %r' % uid)
+ logging.info('non-existent entry in queue: (%r,%r)' % (tree_id,version_id))
else:
checksum = self._calculate_md5sum(file_in_entry_path)
- self._metadata_store.set_property(uid, 'checksum', checksum)
+ self._metadata_store.set_property(tree_id, version_id, 'checksum', checksum)
if self._identical_file_already_exists(checksum):
- if not self._already_linked(uid, checksum):
- existing_entry_uid = \
- self._get_uid_from_checksum(checksum)
+ if not self._already_linked(tree_id, version_id, checksum):
+ existing_entry_uvid = \
+ self._get_uvid_from_checksum(checksum)
- self._file_store.hard_link_entry(uid,
- existing_entry_uid)
+ self._file_store.hard_link_entry(tree_id, version_id,
+ *existing_entry_uvid)
- self._add_checksum_entry(uid, checksum)
+ self._add_checksum_entry(tree_id, version_id, checksum)
else:
self._create_checksum_dir(checksum)
- self._add_checksum_entry(uid, checksum)
+ self._add_checksum_entry(tree_id, version_id, checksum)
- os.remove(os.path.join(queue_path, uid))
+ os.remove(os.path.join(queue_path, "%s-%s" % (tree_id,version_id)))
if len(queue) <= 1:
self._enqueue_checksum_id = None