Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSascha Silbe <sascha@silbe.org>2009-08-17 10:50:12 (GMT)
committer Sascha Silbe <sascha@silbe.org>2009-08-17 10:50:12 (GMT)
commitdc255f5558b1e8c75e8d1fccb3cd48b741f8f35c (patch)
treeffbbb4dd78df392fe2aeea9b7246b2ff988638e7
parentd7ab281cabeab4fe3529ca1fa14b8a15895b9d36 (diff)
prototype take 1: add version_id / vid everywhere (rebased)
-rw-r--r--src/carquinyol/__init__.py8
-rw-r--r--src/carquinyol/datastore.py206
-rw-r--r--src/carquinyol/filestore.py28
-rw-r--r--src/carquinyol/indexstore.py45
-rw-r--r--src/carquinyol/layoutmanager.py12
-rw-r--r--src/carquinyol/metadatastore.py22
-rw-r--r--src/carquinyol/migration.py71
-rw-r--r--src/carquinyol/optimizer.py61
8 files changed, 266 insertions, 187 deletions
diff --git a/src/carquinyol/__init__.py b/src/carquinyol/__init__.py
index e69de29..d53da54 100644
--- a/src/carquinyol/__init__.py
+++ b/src/carquinyol/__init__.py
@@ -0,0 +1,8 @@
+import logging
+import decorator
+
+@decorator.decorator
+def trace(f, *args, **kwargs) :
+ logging.debug("%s(%s)" % (f.__name__, ", ".join([repr(a) for a in args]+['%s=%r' % (k,v) for (k,v) in kwargs.items()]))),
+ return f(*args, **kwargs)
+
diff --git a/src/carquinyol/datastore.py b/src/carquinyol/datastore.py
index 41b16b5..9b66ccd 100644
--- a/src/carquinyol/datastore.py
+++ b/src/carquinyol/datastore.py
@@ -27,6 +27,7 @@ import gobject
from sugar import mime
+from carquinyol import trace
from carquinyol import layoutmanager
from carquinyol import migration
from carquinyol.layoutmanager import MAX_QUERY_LIMIT
@@ -62,6 +63,7 @@ class DataStore(dbus.service.Object):
layout_manager.set_version(layoutmanager.CURRENT_LAYOUT_VERSION)
layout_manager.index_updated = False
elif layout_manager.get_version() == 1:
+ migration.migrate_from_1()
layout_manager.set_version(layoutmanager.CURRENT_LAYOUT_VERSION)
layout_manager.index_updated = False
@@ -86,120 +88,127 @@ class DataStore(dbus.service.Object):
self._optimizer = Optimizer(self._file_store, self._metadata_store)
def _rebuild_index(self):
- uids = layoutmanager.get_instance().find_all()
- logging.debug('Going to update the index with uids %r' % uids)
- gobject.idle_add(lambda: self.__rebuild_index_cb(uids),
+ uvids = layoutmanager.get_instance().find_all()
+ logging.debug('Going to update the index with uvids %r' % uvids)
+ gobject.idle_add(lambda: self.__rebuild_index_cb(uvids),
priority=gobject.PRIORITY_LOW)
- def __rebuild_index_cb(self, uids):
- if uids:
- uid = uids.pop()
+ def __rebuild_index_cb(self, uvids):
+ if uvids:
+ (uid,vid) = uvids.pop()
- logging.debug('Updating entry %r in index. %d to go.' % \
- (uid, len(uids)))
+ logging.debug('Updating entry (%r,%r) in index. %d to go.' % \
+ (uid, vid, len(uvids)))
- if not self._index_store.contains(uid):
+ if not self._index_store.contains(uid,vid):
try:
- props = self._metadata_store.retrieve(uid)
- self._index_store.store(uid, props)
+ props = self._metadata_store.retrieve(uid,vid)
+ self._index_store.store(uid, vid, props)
except Exception:
- logging.error('Error processing %r\n%s.' \
- % (uid, traceback.format_exc()))
+ logging.error('Error processing (%r,%r)\n%s.' \
+ % (uid, vid, traceback.format_exc()))
- if not uids:
+ if not uvids:
logging.debug('Finished updating index.')
layoutmanager.get_instance().index_updated = True
return False
else:
return True
- def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None):
- logger.debug("_create_completion_cb(%r, %r, %r, %r)" % \
- (async_cb, async_err_cb, uid, exc))
+ @trace
+ def _create_completion_cb(self, async_cb, async_err_cb, uid, vid, exc=None):
if exc is not None:
async_err_cb(exc)
return
- self.Created(uid)
- self._optimizer.optimize(uid)
- logger.debug("created %s" % uid)
- async_cb(uid)
+ self.Created(uid, vid)
+ self._optimizer.optimize(uid, vid)
+ logger.debug("created (%s,%s)" % (uid,vid))
+ async_cb(uid,vid)
@dbus.service.method(DS_DBUS_INTERFACE,
in_signature='a{sv}sb',
- out_signature='s',
+ out_signature='ss',
async_callbacks=('async_cb', 'async_err_cb'),
byte_arrays=True)
def create(self, props, file_path, transfer_ownership,
async_cb, async_err_cb):
uid = str(uuid.uuid4())
- logging.debug('datastore.create %r' % uid)
+ vid = str(uuid.uuid4()) # use fake for now
+ logging.debug('datastore.create %r %r' % (uid, vid))
if not props.get('timestamp', ''):
props['timestamp'] = int(time.time())
- self._metadata_store.store(uid, props)
- self._index_store.store(uid, props)
- self._file_store.store(uid, file_path, transfer_ownership,
+ self._metadata_store.store(uid, vid, props)
+ self._index_store.store(uid, vid, props)
+ self._file_store.store(uid, vid, file_path, transfer_ownership,
lambda *args: self._create_completion_cb(async_cb,
async_err_cb,
uid,
+ vid,
*args))
+ return (uid, vid)
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
- def Created(self, uid):
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="ss")
+ def Created(self, uid, vid):
pass
- def _update_completion_cb(self, async_cb, async_err_cb, uid, exc=None):
- logger.debug("_update_completion_cb() called with %r / %r, exc %r" % \
- (async_cb, async_err_cb, exc))
+ @trace
+ def _update_completion_cb(self, async_cb, async_err_cb, uid, vid, exc=None):
if exc is not None:
async_err_cb(exc)
return
- self.Updated(uid)
- self._optimizer.optimize(uid)
- logger.debug("updated %s" % uid)
- async_cb()
+ self.Updated(uid,vid)
+ self._optimizer.optimize(uid,vid)
+ logger.debug("updated %s %s" % (uid, vid))
+ async_cb(uid, vid)
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='sa{sv}sb',
- out_signature='',
+ in_signature='ssa{sv}sb',
+ out_signature='ss',
async_callbacks=('async_cb', 'async_err_cb'),
byte_arrays=True)
- def update(self, uid, props, file_path, transfer_ownership,
+ @trace
+ def update(self, uid, vid, props, file_path, transfer_ownership,
async_cb, async_err_cb):
- logging.debug('datastore.update %r' % uid)
if not props.get('timestamp', ''):
props['timestamp'] = int(time.time())
- self._metadata_store.store(uid, props)
- self._index_store.store(uid, props)
+ # TODO: create branch if required (inside some abstraction layer)
+ if file_path :
+ # only for data updates
+ vid = str(uuid.uuid4()) # use fake for now
- if os.path.exists(self._file_store.get_file_path(uid)) and \
- (not file_path or os.path.exists(file_path)):
- self._optimizer.remove(uid)
- self._file_store.store(uid, file_path, transfer_ownership,
+ self._metadata_store.store(uid, vid, props)
+ self._index_store.store(uid, vid, props)
+
+# if os.path.exists(self._file_store.get_file_path(uid, vid)) and \
+# (not file_path or os.path.exists(file_path)):
+# self._optimizer.remove(uid, vid)
+ self._file_store.store(uid, vid, file_path, transfer_ownership,
lambda *args: self._update_completion_cb(async_cb,
async_err_cb,
uid,
+ vid,
*args))
+ return (uid, vid)
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
- def Updated(self, uid):
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="ss")
+ def Updated(self, uid, vid):
pass
@dbus.service.method(DS_DBUS_INTERFACE,
in_signature='a{sv}as',
out_signature='aa{sv}u')
+ @trace
def find(self, query, properties):
- logging.debug('datastore.find %r' % query)
- t = time.time()
-
+ uvids = None
if layoutmanager.get_instance().index_updated:
try:
- uids, count = self._index_store.find(query)
+ uvids, count = self._index_store.find(query)
except Exception:
logging.error('Failed to query index, will rebuild\n%s' \
% traceback.format_exc())
@@ -209,13 +218,13 @@ class DataStore(dbus.service.Object):
self._index_store.open_index()
self._rebuild_index()
- if not layoutmanager.get_instance().index_updated:
+ if uvids is None :
logging.warning('Index updating, returning all entries')
return self._find_all(query, properties)
entries = []
- for uid in uids:
- entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ for (uid, vid) in uvids:
+ entry_path = layoutmanager.get_instance().get_entry_path(uvid)
if not os.path.exists(entry_path):
logging.warning(
'Inconsistency detected, returning all entries')
@@ -228,7 +237,7 @@ class DataStore(dbus.service.Object):
return self._find_all(query, properties)
- metadata = self._metadata_store.retrieve(uid, properties)
+ metadata = self._metadata_store.retrieve(uid, vid, properties)
entries.append(metadata)
logger.debug('find(): %r' % (time.time() - t))
@@ -236,42 +245,50 @@ class DataStore(dbus.service.Object):
return entries, count
def _find_all(self, query, properties):
- uids = layoutmanager.get_instance().find_all()
- count = len(uids)
+ uvids = layoutmanager.get_instance().find_all()
+ if not query.get('all_versions', False) :
+ # only return latest version for each entry
+ uids_vtime = {}
+ for (uid, vid) in uvids :
+ uids_vtime.setdefault(uid, []).append((vid, self._metadata_store.retrieve(uid, vid, 'timestamp')))
+ uvids = [(uid, sorted(candidates, key=lambda e: e[1], reverse=True)[0][0])
+ for (uid, candidates) in uids_vtime.items()]
+
+ count = len(uvids)
offset = query.get('offset', 0)
limit = query.get('limit', MAX_QUERY_LIMIT)
uids = uids[offset:offset + limit]
entries = []
- for uid in uids:
- metadata = self._metadata_store.retrieve(uid, properties)
+ for (uid, vid) in uvids:
+ metadata = self._metadata_store.retrieve(uid, vid, properties)
entries.append(metadata)
return entries, count
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
+ in_signature='ss',
out_signature='s',
sender_keyword='sender')
- def get_filename(self, uid, sender=None):
- logging.debug('datastore.get_filename %r' % uid)
+ @trace
+ def get_filename(self, uid, vid, sender=None):
user_id = dbus.Bus().get_unix_user(sender)
- extension = self._get_extension(uid)
- return self._file_store.retrieve(uid, user_id, extension)
+ extension = self._get_extension(uid,vid)
+ return self._file_store.retrieve(uid, vid, user_id, extension)
- def _get_extension(self, uid):
- mime_type = self._metadata_store.get_property(uid, 'mime_type')
+ def _get_extension(self, uid, vid):
+ mime_type = self._metadata_store.get_property(uid, vid, 'mime_type')
if mime_type is None or not mime_type:
return ''
return mime.get_primary_extension(mime_type)
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
+ in_signature='ss',
out_signature='a{sv}')
- def get_properties(self, uid):
- logging.debug('datastore.get_properties %r' % uid)
- metadata = self._metadata_store.retrieve(uid)
+ @trace
+ def get_properties(self, uid, vid):
+ metadata = self._metadata_store.retrieve(uid,vid)
return metadata
@dbus.service.method(DS_DBUS_INTERFACE,
@@ -289,23 +306,24 @@ class DataStore(dbus.service.Object):
return []
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
+ in_signature='ss',
out_signature='')
- def delete(self, uid):
- self._optimizer.remove(uid)
+ def delete(self, uid, vid):
+ # TODO: vid=None/'' => remove all versions
+ self._optimizer.remove(uid, vid)
- self._index_store.delete(uid)
- self._file_store.delete(uid)
- self._metadata_store.delete(uid)
+ self._index_store.delete(uid, vid)
+ self._file_store.delete(uid, vid)
+ self._metadata_store.delete(uid, vid)
- entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ entry_path = layoutmanager.get_instance().get_entry_path(uid, vid)
os.removedirs(entry_path)
- self.Deleted(uid)
- logger.debug("deleted %s" % uid)
+ self.Deleted(uid, vid)
+ logger.debug("deleted (%r,%r)" % (uid,vid))
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
- def Deleted(self, uid):
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="ss")
+ def Deleted(self, uid, vid):
pass
def stop(self):
@@ -316,29 +334,3 @@ class DataStore(dbus.service.Object):
@dbus.service.signal(DS_DBUS_INTERFACE)
def Stopped(self):
pass
-
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature="sa{sv}",
- out_signature='s')
- def mount(self, uri, options=None):
- return ''
-
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature="",
- out_signature="aa{sv}")
- def mounts(self):
- return [{'id': 1}]
-
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature="s",
- out_signature="")
- def unmount(self, mountpoint_id):
- pass
-
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
- def Mounted(self, descriptior):
- pass
-
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
- def Unmounted(self, descriptor):
- pass
diff --git a/src/carquinyol/filestore.py b/src/carquinyol/filestore.py
index b96c323..bf77e5b 100644
--- a/src/carquinyol/filestore.py
+++ b/src/carquinyol/filestore.py
@@ -31,11 +31,11 @@ class FileStore(object):
# TODO: add protection against store and retrieve operations on entries
# that are being processed async.
- def store(self, uid, file_path, transfer_ownership, completion_cb):
+ def store(self, uid, vid, file_path, transfer_ownership, completion_cb):
"""Store a file for a given entry.
"""
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
@@ -83,10 +83,10 @@ class FileStore(object):
deleting this file.
"""
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
file_path = os.path.join(dir_path, 'data')
if not os.path.exists(file_path):
- logging.debug('Entry %r doesnt have any file' % uid)
+ logging.debug('Entry (%r,%r) doesnt have any file' % (uid,vid))
return ''
use_instance_dir = os.path.exists('/etc/olpc-security') and \
@@ -108,19 +108,19 @@ class FileStore(object):
elif extension:
extension = '.' + extension
- destination_path = os.path.join(destination_dir, uid + extension)
+ destination_path = os.path.join(destination_dir, "%s-%s%s" % (uid, vid, extension))
attempt = 1
while os.path.exists(destination_path):
if attempt > 10:
- fd_, destination_path = tempfile.mkstemp(prefix=uid,
+ fd_, destination_path = tempfile.mkstemp(prefix="%s-%s" % (uid,vid),
suffix=extension,
dir=destination_dir)
del fd_
os.unlink(destination_path)
break
else:
- file_name = '%s_%s%s' % (uid, attempt, extension)
+ file_name = '%s-%s_%s%s' % (uid, vid, attempt, extension)
destination_path = os.path.join(destination_dir, file_name)
attempt += 1
@@ -144,25 +144,25 @@ class FileStore(object):
return destination_path
- def get_file_path(self, uid):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ def get_file_path(self, uid, vid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
return os.path.join(dir_path, 'data')
- def delete(self, uid):
+ def delete(self, uid, vid):
"""Remove the file associated to a given entry.
"""
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
file_path = os.path.join(dir_path, 'data')
if os.path.exists(file_path):
os.remove(file_path)
- def hard_link_entry(self, new_uid, existing_uid):
+ def hard_link_entry(self, new_uid, new_vid, existing_uid, existing_vid):
existing_file = os.path.join(
- layoutmanager.get_instance().get_entry_path(existing_uid),
+ layoutmanager.get_instance().get_entry_path(existing_uid, existing_vid),
'data')
new_file = os.path.join(
- layoutmanager.get_instance().get_entry_path(new_uid),
+ layoutmanager.get_instance().get_entry_path(new_uid, new_vid),
'data')
logging.debug('removing %r' % new_file)
diff --git a/src/carquinyol/indexstore.py b/src/carquinyol/indexstore.py
index 42c3132..617ebd4 100644
--- a/src/carquinyol/indexstore.py
+++ b/src/carquinyol/indexstore.py
@@ -27,8 +27,10 @@ from carquinyol.layoutmanager import MAX_QUERY_LIMIT
_VALUE_UID = 0
_VALUE_TIMESTAMP = 1
_VALUE_TITLE = 2
+_VALUE_VID = 3
_PREFIX_UID = 'Q'
+_PREFIX_VID = 'V'
_PREFIX_ACTIVITY = 'A'
_PREFIX_ACTIVITY_ID = 'I'
_PREFIX_MIME_TYPE = 'M'
@@ -69,17 +71,19 @@ class IndexStore(object):
for f in os.listdir(index_path):
os.remove(os.path.join(index_path, f))
- def contains(self, uid):
- postings = self._database.postlist(_PREFIX_UID + uid)
+ def contains(self, uid, vid):
+ postings = self._database.postlist(_PREFIX_UID + uid + _PREFIX_VID + vid)
try:
postlist_item = postings.next()
except StopIteration:
return False
return True
- def store(self, uid, properties):
+ def store(self, uid, vid, properties):
document = Document()
+ document.add_term("%s%s%s%s" % (_PREFIX_UID, uid, _PREFIX_VID, vid))
document.add_term(_PREFIX_UID + uid)
+ document.add_term(_PREFIX_VID + vid)
document.add_term(_PREFIX_ACTIVITY + properties.get('activity', ''))
document.add_term(_PREFIX_MIME_TYPE + properties.get('mime_type', ''))
document.add_term(_PREFIX_ACTIVITY_ID +
@@ -87,6 +91,7 @@ class IndexStore(object):
document.add_term(_PREFIX_KEEP + str(properties.get('keep', 0)))
document.add_value(_VALUE_UID, uid)
+ document.add_value(_VALUE_VID, vid)
document.add_value(_VALUE_TIMESTAMP, str(properties['timestamp']))
document.add_value(_VALUE_TITLE, properties.get('title', '').strip())
@@ -107,10 +112,10 @@ class IndexStore(object):
term_generator.index_text_without_positions(
self._extract_text(properties), 1, '')
- if not self.contains(uid):
+ if not self.contains(uid, vid):
self._database.add_document(document)
else:
- self._database.replace_document(_PREFIX_UID + uid, document)
+ self._database.replace_document("%s%s%s%s" % (_PREFIX_UID, uid, _PREFIX_VID, vid), document)
self._flush()
def _extract_text(self, properties):
@@ -129,6 +134,7 @@ class IndexStore(object):
def find(self, query):
offset = query.pop('offset', 0)
limit = query.pop('limit', MAX_QUERY_LIMIT)
+ all_versions = query.pop('all_versions', False)
order_by = query.pop('order_by', [])
enquire = Enquire(self._database)
@@ -156,11 +162,25 @@ class IndexStore(object):
query_result = enquire.get_mset(offset, limit, check_at_least)
total_count = query_result.get_matches_estimated()
- uids = []
+ uvids = []
+ timestamps = []
for hit in query_result:
- uids.append(hit.document.get_value(_VALUE_UID))
+ uvids.append((hit.document.get_value(_VALUE_UID), hit.document.get_value(_VALUE_VID)))
- return (uids, total_count)
+ if not all_versions :
+ # only return latest version for each entry
+ # result set is already sorted by time so we only need to take the first entry for each uid
+ uids_vid = {}
+ uvids_new = []
+ for (uid, vid) in uvids :
+ if uid not in uids_vid :
+ uids_vid[uid] = vid
+ uvids_new.append((uid,vid))
+
+ uvids = uvids_new
+
+ # TODO: total_count will be totally off if all_versions is not set
+ return (uvids, total_count)
def _parse_query(self, query_dict):
logging.debug('_parse_query %r' % query_dict)
@@ -193,10 +213,15 @@ class IndexStore(object):
query = Query(Query.OP_VALUE_RANGE, _VALUE_TIMESTAMP, start, end)
queries.append(query)
+ # TODO: refactor / simplify
uid = query_dict.pop('uid', None)
if uid is not None:
queries.append(Query(_PREFIX_UID + uid))
+ vid = query_dict.pop('vid', None)
+ if vid is not None:
+ queries.append(Query(_PREFIX_VID + vid))
+
activity = query_dict.pop('activity', None)
if activity is not None:
queries.append(Query(_PREFIX_ACTIVITY + activity))
@@ -226,8 +251,8 @@ class IndexStore(object):
return Query(Query.OP_AND, queries)
- def delete(self, uid):
- self._database.delete_document(_PREFIX_UID + uid)
+ def delete(self, uid, vid):
+ self._database.delete_document("%s%s%s%s" % (_PREFIX_UID, uid, _PREFIX_VID, vid))
def get_activities(self):
activities = []
diff --git a/src/carquinyol/layoutmanager.py b/src/carquinyol/layoutmanager.py
index 42db46f..f14c9e6 100644
--- a/src/carquinyol/layoutmanager.py
+++ b/src/carquinyol/layoutmanager.py
@@ -61,9 +61,9 @@ class LayoutManager(object):
version_path = os.path.join(self._root_path, 'version')
open(version_path, 'w').write(str(version))
- def get_entry_path(self, uid):
+ def get_entry_path(self, uid, vid):
# os.path.join() is just too slow
- return '%s/%s/%s' % (self._root_path, uid[:2], uid)
+ return '%s/%s/%s-%s' % (self._root_path, uid[:2], uid, vid)
def get_root_path(self):
return self._root_path
@@ -93,13 +93,13 @@ class LayoutManager(object):
index_updated = property(_is_index_updated, _set_index_updated)
def find_all(self):
- uids = []
+ uvids = []
for f in os.listdir(self._root_path):
if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2:
for g in os.listdir(os.path.join(self._root_path, f)):
- if len(g) == 36:
- uids.append(g)
- return uids
+ if len(g) == 73:
+ uvids.append((g[:36], g[37:]))
+ return uvids
def _is_empty(self):
for f in os.listdir(self._root_path):
diff --git a/src/carquinyol/metadatastore.py b/src/carquinyol/metadatastore.py
index 8461ef7..ea0b389 100644
--- a/src/carquinyol/metadatastore.py
+++ b/src/carquinyol/metadatastore.py
@@ -7,9 +7,8 @@ MAX_SIZE = 256
class MetadataStore(object):
-
- def store(self, uid, metadata):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ def store(self, uid, vid, metadata):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
@@ -21,6 +20,7 @@ class MetadataStore(object):
os.remove(os.path.join(metadata_path, key))
metadata['uid'] = uid
+ metadata['vid'] = vid
for key, value in metadata.items():
# Hack to support activities that still pass properties named as
@@ -38,19 +38,19 @@ class MetadataStore(object):
finally:
f.close()
- def retrieve(self, uid, properties=None):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ def retrieve(self, uid, vid, properties=None):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
return metadatareader.retrieve(dir_path, properties)
- def delete(self, uid):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ def delete(self, uid, vid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
metadata_path = os.path.join(dir_path, 'metadata')
for key in os.listdir(metadata_path):
os.remove(os.path.join(metadata_path, key))
os.rmdir(metadata_path)
- def get_property(self, uid, key):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ def get_property(self, uid, vid, key):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
metadata_path = os.path.join(dir_path, 'metadata')
property_path = os.path.join(metadata_path, key)
if os.path.exists(property_path):
@@ -58,8 +58,8 @@ class MetadataStore(object):
else:
return None
- def set_property(self, uid, key, value):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ def set_property(self, uid, vid, key, value):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
metadata_path = os.path.join(dir_path, 'metadata')
property_path = os.path.join(metadata_path, key)
open(property_path, 'w').write(value)
diff --git a/src/carquinyol/migration.py b/src/carquinyol/migration.py
index 02f8e68..8ff63b3 100644
--- a/src/carquinyol/migration.py
+++ b/src/carquinyol/migration.py
@@ -22,6 +22,7 @@ import logging
import shutil
import time
import traceback
+import uuid
import cjson
@@ -31,7 +32,7 @@ DATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
def migrate_from_0():
- logging.info('Migrating datastore from version 0 to version 1')
+ logging.info('Migrating datastore from version 0 to version 2')
root_path = layoutmanager.get_instance().get_root_path()
old_root_path = os.path.join(root_path, 'store')
@@ -44,10 +45,11 @@ def migrate_from_0():
continue
logging.debug('Migrating entry %r' % uid)
+ vid = str(uuid.uuid4())
try:
- _migrate_metadata(root_path, old_root_path, uid)
- _migrate_file(root_path, old_root_path, uid)
- _migrate_preview(root_path, old_root_path, uid)
+ _migrate_metadata_0(root_path, old_root_path, uid, vid)
+ _migrate_file_0(root_path, old_root_path, uid, vid)
+ _migrate_preview_0(root_path, old_root_path, uid, vid)
except Exception:
logging.error('Error while migrating entry %r: %s\n' % \
(uid, traceback.format_exc()))
@@ -59,8 +61,8 @@ def migrate_from_0():
logging.info('Migration finished')
-def _migrate_metadata(root_path, old_root_path, uid):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+def _migrate_metadata_0(root_path, old_root_path, uid, vid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
metadata_path = os.path.join(dir_path, 'metadata')
os.makedirs(metadata_path)
@@ -70,6 +72,8 @@ def _migrate_metadata(root_path, old_root_path, uid):
if 'uid' not in metadata:
metadata['uid'] = uid
+ metadata['vid'] = vid
+
if 'timestamp' not in metadata and 'mtime' in metadata:
metadata['timestamp'] = \
time.mktime(time.strptime(metadata['mtime'], DATE_FORMAT))
@@ -91,15 +95,62 @@ def _migrate_metadata(root_path, old_root_path, uid):
(key, uid, traceback.format_exc()))
-def _migrate_file(root_path, old_root_path, uid):
+def _migrate_file_0(root_path, old_root_path, uid, vid):
if os.path.exists(os.path.join(old_root_path, uid)):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
os.rename(os.path.join(old_root_path, uid),
os.path.join(dir_path, 'data'))
-def _migrate_preview(root_path, old_root_path, uid):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+def _migrate_preview_0(root_path, old_root_path, uid, vid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
metadata_path = os.path.join(dir_path, 'metadata')
os.rename(os.path.join(old_root_path, 'preview', uid),
os.path.join(metadata_path, 'preview'))
+
+
+def migrate_from_1():
+ logging.info('Migrating datastore from version 1 to version 2')
+
+ root_path = layoutmanager.get_instance().get_root_path()
+ checksum_path = os.path.join(root_path, "checksums")
+
+ vids = {}
+ for hash02 in os.listdir(root_path):
+ if len(hash02) != 2 :
+ continue
+
+ for uid in os.listdir(os.path.join(root_path, hash02)) :
+ if (len(uid) != 36) :
+ continue
+
+ logging.debug('Migrating entry %r' % uid)
+
+ vid = str(uuid.uuid4())
+ vids[uid] = vid
+ try:
+ new_path = layoutmanager.get_instance().get_entry_path(uid, vid)
+ os.rename(os.path.join(root_path, hash02, uid),
+ new_path)
+ file(os.path.join(new_path, "metadata", "vid"), "w").write(vid)
+
+ except Exception:
+ logging.error('Error while migrating entry %r: %s\n' % \
+ (uid, traceback.format_exc()))
+
+ for checksum in os.listdir(checksum_path) :
+ entries_path = os.path.join(checksum_path, checksum)
+ for uid in os.listdir(entries_path) :
+ if len(uid) != 36 :
+ continue
+
+ try :
+ os.rename(os.path.join(entries_path, uid),
+ os.path.join(entries_path, "%s-%s" % (uid,vids[uid])))
+
+ except Exception:
+ logging.error('Error while migrating checksum entry %r / %r: %s\n' % \
+ (checksum, uid, traceback.format_exc()))
+
+ logging.info('Migration finished')
+
diff --git a/src/carquinyol/optimizer.py b/src/carquinyol/optimizer.py
index 6cb1374..361afef 100644
--- a/src/carquinyol/optimizer.py
+++ b/src/carquinyol/optimizer.py
@@ -18,6 +18,7 @@ import os
import errno
import logging
import subprocess
+import uuid
import gobject
@@ -33,33 +34,34 @@ class Optimizer(object):
self._metadata_store = metadata_store
self._enqueue_checksum_id = None
- def optimize(self, uid):
+ def optimize(self, uid, vid):
"""Add an entry to a queue of entries to be checked for duplicates.
"""
- if not os.path.exists(self._file_store.get_file_path(uid)):
+ if not os.path.exists(self._file_store.get_file_path(uid, vid)):
return
queue_path = layoutmanager.get_instance().get_queue_path()
- open(os.path.join(queue_path, uid), 'w').close()
- logging.debug('optimize %r' % os.path.join(queue_path, uid))
+ fname = os.path.join(queue_path, "%s-%s" % (uid, vid))
+ open(fname, 'w').close()
+ logging.debug('optimize %r' % fname)
if self._enqueue_checksum_id is None:
self._enqueue_checksum_id = \
gobject.idle_add(self._process_entry_cb,
priority=gobject.PRIORITY_LOW)
- def remove(self, uid):
+ def remove(self, uid, vid):
"""Remove any structures left from space optimization
"""
- checksum = self._metadata_store.get_property(uid, 'checksum')
+ checksum = self._metadata_store.get_property(uid, vid, 'checksum')
if checksum is None:
return
checksums_dir = layoutmanager.get_instance().get_checksums_dir()
checksum_path = os.path.join(checksums_dir, checksum)
- checksum_entry_path = os.path.join(checksum_path, uid)
+ checksum_entry_path = os.path.join(checksum_path, "%s-%s" % (uid,vid))
if os.path.exists(checksum_entry_path):
logging.debug('remove %r' % checksum_entry_path)
@@ -81,14 +83,14 @@ class Optimizer(object):
checksum_path = os.path.join(checksums_dir, checksum)
return os.path.exists(checksum_path)
- def _get_uid_from_checksum(self, checksum):
+ def _get_uvid_from_checksum(self, checksum):
"""Get an existing entry which file matches checksum.
"""
checksums_dir = layoutmanager.get_instance().get_checksums_dir()
checksum_path = os.path.join(checksums_dir, checksum)
- first_uid = os.listdir(checksum_path)[0]
- return first_uid
+ first_uvid = os.listdir(checksum_path)[0]
+ return (first_uvid[:36], first_uvid[37:])
def _create_checksum_dir(self, checksum):
"""Create directory that tracks files with this same checksum.
@@ -99,24 +101,25 @@ class Optimizer(object):
logging.debug('create dir %r' % checksum_path)
os.mkdir(checksum_path)
- def _add_checksum_entry(self, uid, checksum):
- """Create a file in the checksum dir with the uid of the entry
+ def _add_checksum_entry(self, uid, vid, checksum):
+ """Create a file in the checksum dir with the uid and vid of the entry
"""
checksums_dir = layoutmanager.get_instance().get_checksums_dir()
checksum_path = os.path.join(checksums_dir, checksum)
- logging.debug('touch %r' % os.path.join(checksum_path, uid))
- open(os.path.join(checksum_path, uid), 'w').close()
+ fname = os.path.join(checksum_path, "%s-%s" % (uid,vid))
+ logging.debug('touch %r' % fname)
+ open(fname, 'w').close()
- def _already_linked(self, uid, checksum):
+ def _already_linked(self, uid, vid, checksum):
"""Check if this entry's file is already a hard link to the checksums
dir.
"""
checksums_dir = layoutmanager.get_instance().get_checksums_dir()
checksum_path = os.path.join(checksums_dir, checksum)
- return os.path.exists(os.path.join(checksum_path, uid))
+ return os.path.exists(os.path.join(checksum_path, "%s-%s" % (uid,vid)))
def _process_entry_cb(self):
"""Process one item in the checksums queue by calculating its checksum,
@@ -127,30 +130,30 @@ class Optimizer(object):
queue_path = layoutmanager.get_instance().get_queue_path()
queue = os.listdir(queue_path)
if queue:
- uid = queue[0]
- logging.debug('_process_entry_cb processing %r' % uid)
+ (uid,vid) = queue[0][:36], queue[0][37:]
+ logging.debug('_process_entry_cb processing (%r,%r)' % (uid,vid))
- file_in_entry_path = self._file_store.get_file_path(uid)
+ file_in_entry_path = self._file_store.get_file_path(uid,vid)
if not os.path.exists(file_in_entry_path):
- logging.info('non-existent entry in queue: %r' % uid)
+ logging.info('non-existent entry in queue: (%r,%r)' % (uid,vid))
else:
checksum = self._calculate_md5sum(file_in_entry_path)
- self._metadata_store.set_property(uid, 'checksum', checksum)
+ self._metadata_store.set_property(uid, vid, 'checksum', checksum)
if self._identical_file_already_exists(checksum):
- if not self._already_linked(uid, checksum):
- existing_entry_uid = \
- self._get_uid_from_checksum(checksum)
+ if not self._already_linked(uid, vid, checksum):
+ existing_entry_uvid = \
+ self._get_uvid_from_checksum(checksum)
- self._file_store.hard_link_entry(uid,
- existing_entry_uid)
+ self._file_store.hard_link_entry(uid, vid,
+ *existing_entry_uvid)
- self._add_checksum_entry(uid, checksum)
+ self._add_checksum_entry(uid, vid, checksum)
else:
self._create_checksum_dir(checksum)
- self._add_checksum_entry(uid, checksum)
+ self._add_checksum_entry(uid, vid, checksum)
- os.remove(os.path.join(queue_path, uid))
+ os.remove(os.path.join(queue_path, "%s-%s" % (uid,vid)))
if len(queue) <= 1:
self._enqueue_checksum_id = None