Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSascha Silbe <sascha@silbe.org>2009-06-22 00:30:00 (GMT)
committer Sascha Silbe <sascha@silbe.org>2009-06-22 00:30:00 (GMT)
commit2ae002dfe45d59ad1ac3afe955f9d262461f8d6d (patch)
tree4925557ed385d12416423b04b558ac667388766a
parent6b679e122c2a45e29b61287eef3c083822535f49 (diff)
prototype take 1: add version_id / vid everywhere
-rw-r--r--src/carquinyol/__init__.py7
-rw-r--r--src/carquinyol/datastore.py209
-rw-r--r--src/carquinyol/filestore.py30
-rw-r--r--src/carquinyol/indexstore.py52
-rw-r--r--src/carquinyol/layoutmanager.py12
-rw-r--r--src/carquinyol/metadatastore.py21
-rw-r--r--src/carquinyol/migration.py70
-rw-r--r--src/carquinyol/optimizer.py61
8 files changed, 275 insertions, 187 deletions
diff --git a/src/carquinyol/__init__.py b/src/carquinyol/__init__.py
index 8b13789..d53da54 100644
--- a/src/carquinyol/__init__.py
+++ b/src/carquinyol/__init__.py
@@ -1 +1,8 @@
+import logging
+import decorator
+
+@decorator.decorator
+def trace(f, *args, **kwargs) :
+ logging.debug("%s(%s)" % (f.__name__, ", ".join([repr(a) for a in args]+['%s=%r' % (k,v) for (k,v) in kwargs.items()]))),
+ return f(*args, **kwargs)
diff --git a/src/carquinyol/datastore.py b/src/carquinyol/datastore.py
index a118e03..8a04ca9 100644
--- a/src/carquinyol/datastore.py
+++ b/src/carquinyol/datastore.py
@@ -26,6 +26,7 @@ import gobject
from sugar import mime
+from carquinyol import trace
from carquinyol import layoutmanager
from carquinyol import migration
from carquinyol.layoutmanager import MAX_QUERY_LIMIT
@@ -56,7 +57,12 @@ class DataStore(dbus.service.Object):
layout_manager = layoutmanager.get_instance()
if layout_manager.get_version() == 0:
migration.migrate_from_0()
- layout_manager.set_version(1)
+ layout_manager.set_version(2)
+ layout_manager.index_updated = False
+
+ if layout_manager.get_version() == 1:
+ migration.migrate_from_1()
+ layout_manager.set_version(2)
layout_manager.index_updated = False
self._metadata_store = MetadataStore()
@@ -80,120 +86,129 @@ class DataStore(dbus.service.Object):
self._optimizer = Optimizer(self._file_store, self._metadata_store)
def _rebuild_index(self):
- uids = layoutmanager.get_instance().find_all()
- logging.debug('Going to update the index with uids %r' % uids)
- gobject.idle_add(lambda: self.__rebuild_index_cb(uids),
+ uvids = layoutmanager.get_instance().find_all()
+ logging.debug('Going to update the index with uvids %r' % uvids)
+ gobject.idle_add(lambda: self.__rebuild_index_cb(uvids),
priority=gobject.PRIORITY_LOW)
- def __rebuild_index_cb(self, uids):
- if uids:
- uid = uids.pop()
+ def __rebuild_index_cb(self, uvids):
+ if uvids:
+ (uid,vid) = uvids.pop()
- logging.debug('Updating entry %r in index. %d to go.' % \
- (uid, len(uids)))
+ logging.debug('Updating entry (%r,%r) in index. %d to go.' % \
+ (uid, vid, len(uvids)))
- if not self._index_store.contains(uid):
+ if not self._index_store.contains(uid,vid):
try:
- props = self._metadata_store.retrieve(uid)
- self._index_store.store(uid, props)
+ props = self._metadata_store.retrieve(uid,vid)
+ self._index_store.store(uid, vid, props)
except Exception:
- logging.error('Error processing %r\n%s.' \
- % (uid, traceback.format_exc()))
+ logging.error('Error processing (%r,%r)\n%s.' \
+ % (uid, vid, traceback.format_exc()))
- if not uids:
+ if not uvids:
logging.debug('Finished updating index.')
layoutmanager.get_instance().index_updated = True
return False
else:
return True
- def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None):
- logger.debug("_create_completion_cb(%r, %r, %r, %r)" % \
- (async_cb, async_err_cb, uid, exc))
+ @trace
+ def _create_completion_cb(self, async_cb, async_err_cb, uid, vid, exc=None):
if exc is not None:
async_err_cb(exc)
return
- self.Created(uid)
- self._optimizer.optimize(uid)
- logger.debug("created %s" % uid)
- async_cb(uid)
+ self.Created(uid, vid)
+ self._optimizer.optimize(uid, vid)
+ logger.debug("created (%s,%s)" % (uid,vid))
+ async_cb(uid,vid)
@dbus.service.method(DS_DBUS_INTERFACE,
in_signature='a{sv}sb',
- out_signature='s',
+ out_signature='ss',
async_callbacks=('async_cb', 'async_err_cb'),
byte_arrays=True)
def create(self, props, file_path, transfer_ownership,
async_cb, async_err_cb):
uid = str(uuid.uuid4())
- logging.debug('datastore.create %r' % uid)
+ vid = str(uuid.uuid4()) # use fake for now
+ logging.debug('datastore.create %r %r' % (uid, vid))
if not props.get('timestamp', ''):
props['timestamp'] = int(time.time())
- self._metadata_store.store(uid, props)
- self._index_store.store(uid, props)
- self._file_store.store(uid, file_path, transfer_ownership,
+ self._metadata_store.store(uid, vid, props)
+ self._index_store.store(uid, vid, props)
+ self._file_store.store(uid, vid, file_path, transfer_ownership,
lambda *args: self._create_completion_cb(async_cb,
async_err_cb,
uid,
+ vid,
*args))
+ return (uid, vid)
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
- def Created(self, uid):
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="ss")
+ def Created(self, uid, vid):
pass
- def _update_completion_cb(self, async_cb, async_err_cb, uid, exc=None):
- logger.debug("_update_completion_cb() called with %r / %r, exc %r" % \
- (async_cb, async_err_cb, exc))
+ @trace
+ def _update_completion_cb(self, async_cb, async_err_cb, uid, vid, exc=None):
if exc is not None:
async_err_cb(exc)
return
- self.Updated(uid)
- self._optimizer.optimize(uid)
- logger.debug("updated %s" % uid)
- async_cb()
+ self.Updated(uid,vid)
+ self._optimizer.optimize(uid,vid)
+ logger.debug("updated %s %s" % (uid, vid))
+ async_cb(uid, vid)
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='sa{sv}sb',
- out_signature='',
+ in_signature='ssa{sv}sb',
+ out_signature='ss',
async_callbacks=('async_cb', 'async_err_cb'),
byte_arrays=True)
- def update(self, uid, props, file_path, transfer_ownership,
+ @trace
+ def update(self, uid, vid, props, file_path, transfer_ownership,
async_cb, async_err_cb):
- logging.debug('datastore.update %r' % uid)
if not props.get('timestamp', ''):
props['timestamp'] = int(time.time())
- self._metadata_store.store(uid, props)
- self._index_store.store(uid, props)
+ # TODO: create branch if required (inside some abstraction layer)
+ if file_path :
+ # only for data updates
+ vid = str(uuid.uuid4()) # use fake for now
- if os.path.exists(self._file_store.get_file_path(uid)) and \
- (not file_path or os.path.exists(file_path)):
- self._optimizer.remove(uid)
- self._file_store.store(uid, file_path, transfer_ownership,
+ self._metadata_store.store(uid, vid, props)
+ self._index_store.store(uid, vid, props)
+
+# if os.path.exists(self._file_store.get_file_path(uid, vid)) and \
+# (not file_path or os.path.exists(file_path)):
+# self._optimizer.remove(uid, vid)
+ self._file_store.store(uid, vid, file_path, transfer_ownership,
lambda *args: self._update_completion_cb(async_cb,
async_err_cb,
uid,
+ vid,
*args))
+ return (uid, vid)
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
- def Updated(self, uid):
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="ss")
+ def Updated(self, uid, vid):
pass
@dbus.service.method(DS_DBUS_INTERFACE,
in_signature='a{sv}as',
out_signature='aa{sv}u')
+ @trace
def find(self, query, properties):
- logging.debug('datastore.find %r' % query)
t = time.time()
+ uvids = None
if layoutmanager.get_instance().index_updated:
try:
- uids, count = self._index_store.find(query)
+ uvids, count = self._index_store.find(query)
except Exception:
logging.error('Failed to query index, will rebuild\n%s' \
% traceback.format_exc())
@@ -203,45 +218,56 @@ class DataStore(dbus.service.Object):
self._index_store.open_index()
self._rebuild_index()
- if not layoutmanager.get_instance().index_updated:
+ if uvids is None :
logging.warning('Index updating, returning all entries')
- uids = layoutmanager.get_instance().find_all()
- count = len(uids)
+ uvids = layoutmanager.get_instance().find_all()
+ if not query.get('all_versions', False) :
+ # only return latest version for each entry
+ uids_vtime = {}
+ for (uid, vid) in uvids :
+ uids_vtime.setdefault(uid, []).append((vid, self._metadata_store.retrieve(uid, vid, 'timestamp')))
+
+ uvids = [(uid, sorted(candidates, key=lambda e: e[1], reverse=True)[0][0])
+ for (uid, candidates) in uids_vtime.items()]
+
+ count = len(uvids)
offset = query.get('offset', 0)
limit = query.get('limit', MAX_QUERY_LIMIT)
- uids = uids[offset:offset + limit]
+ uvids = uvids[offset:offset + limit]
+ logger.debug('uvids=%r' % (uvids,))
entries = []
- for uid in uids:
- metadata = self._metadata_store.retrieve(uid, properties)
+ for (uid,vid) in uvids:
+ metadata = self._metadata_store.retrieve(uid, vid, properties)
entries.append(metadata)
logger.debug('find(): %r' % (time.time() - t))
+ logger.debug('count=%r, entries=%r' % (count, entries))
return entries, count
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
+ in_signature='ss',
out_signature='s',
sender_keyword='sender')
- def get_filename(self, uid, sender=None):
- logging.debug('datastore.get_filename %r' % uid)
+ @trace
+ def get_filename(self, uid, vid, sender=None):
user_id = dbus.Bus().get_unix_user(sender)
- extension = self._get_extension(uid)
- return self._file_store.retrieve(uid, user_id, extension)
+ extension = self._get_extension(uid,vid)
+ return self._file_store.retrieve(uid, vid, user_id, extension)
- def _get_extension(self, uid):
- mime_type = self._metadata_store.get_property(uid, 'mime_type')
+ def _get_extension(self, uid, vid):
+ mime_type = self._metadata_store.get_property(uid, vid, 'mime_type')
if mime_type is None or not mime_type:
return ''
return mime.get_primary_extension(mime_type)
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
+ in_signature='ss',
out_signature='a{sv}')
- def get_properties(self, uid):
- logging.debug('datastore.get_properties %r' % uid)
- metadata = self._metadata_store.retrieve(uid)
+ @trace
+ def get_properties(self, uid, vid):
+ metadata = self._metadata_store.retrieve(uid,vid)
return metadata
@dbus.service.method(DS_DBUS_INTERFACE,
@@ -259,23 +285,24 @@ class DataStore(dbus.service.Object):
return []
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
+ in_signature='ss',
out_signature='')
- def delete(self, uid):
- self._optimizer.remove(uid)
+ def delete(self, uid, vid):
+ # TODO: vid=None/'' => remove all versions
+ self._optimizer.remove(uid, vid)
- self._index_store.delete(uid)
- self._file_store.delete(uid)
- self._metadata_store.delete(uid)
+ self._index_store.delete(uid, vid)
+ self._file_store.delete(uid, vid)
+ self._metadata_store.delete(uid, vid)
- entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ entry_path = layoutmanager.get_instance().get_entry_path(uid, vid)
os.removedirs(entry_path)
- self.Deleted(uid)
- logger.debug("deleted %s" % uid)
+ self.Deleted(uid, vid)
+ logger.debug("deleted (%r,%r)" % (uid,vid))
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
- def Deleted(self, uid):
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="ss")
+ def Deleted(self, uid, vid):
pass
def stop(self):
@@ -287,29 +314,3 @@ class DataStore(dbus.service.Object):
def Stopped(self):
pass
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature="sa{sv}",
- out_signature='s')
- def mount(self, uri, options=None):
- return ''
-
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature="",
- out_signature="aa{sv}")
- def mounts(self):
- return [{'id': 1}]
-
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature="s",
- out_signature="")
- def unmount(self, mountpoint_id):
- pass
-
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
- def Mounted(self, descriptior):
- pass
-
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
- def Unmounted(self, descriptor):
- pass
-
diff --git a/src/carquinyol/filestore.py b/src/carquinyol/filestore.py
index f88c531..7a4591c 100644
--- a/src/carquinyol/filestore.py
+++ b/src/carquinyol/filestore.py
@@ -29,11 +29,11 @@ class FileStore(object):
# TODO: add protection against store and retrieve operations on entries
# that are being processed async.
- def store(self, uid, file_path, transfer_ownership, completion_cb):
+ def store(self, uid, vid, file_path, transfer_ownership, completion_cb):
"""Store a file for a given entry.
"""
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
@@ -75,15 +75,15 @@ class FileStore(object):
async_copy = AsyncCopy(file_path, destination_path, completion_cb)
async_copy.start()
- def retrieve(self, uid, user_id, extension):
+ def retrieve(self, uid, vid, user_id, extension):
"""Place the file associated to a given entry into a directory where the
user can read it. The caller is reponsible for deleting this file.
"""
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
file_path = os.path.join(dir_path, 'data')
if not os.path.exists(file_path):
- logging.debug('Entry %r doesnt have any file' % uid)
+ logging.debug('Entry (%r,%r) doesnt have any file' % (uid,vid))
return ''
use_instance_dir = os.path.exists('/etc/olpc-security') and \
@@ -105,19 +105,19 @@ class FileStore(object):
elif extension:
extension = '.' + extension
- destination_path = os.path.join(destination_dir, uid + extension)
+ destination_path = os.path.join(destination_dir, "%s-%s%s" % (uid, vid, extension))
attempt = 1
while os.path.exists(destination_path):
if attempt > 10:
- fd_, destination_path = tempfile.mkstemp(prefix=uid,
+ fd_, destination_path = tempfile.mkstemp(prefix="%s-%s" % (uid,vid),
suffix=extension,
dir=destination_dir)
del fd_
os.unlink(destination_path)
break
else:
- file_name = '%s_%s%s' % (uid, attempt, extension)
+ file_name = '%s-%s_%s%s' % (uid, vid, attempt, extension)
destination_path = os.path.join(destination_dir, file_name)
attempt += 1
@@ -141,25 +141,25 @@ class FileStore(object):
return destination_path
- def get_file_path(self, uid):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ def get_file_path(self, uid, vid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
return os.path.join(dir_path, 'data')
- def delete(self, uid):
+ def delete(self, uid, vid):
"""Remove the file associated to a given entry.
"""
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
file_path = os.path.join(dir_path, 'data')
if os.path.exists(file_path):
os.remove(file_path)
- def hard_link_entry(self, new_uid, existing_uid):
+ def hard_link_entry(self, new_uid, new_vid, existing_uid, existing_vid):
existing_file = os.path.join(
- layoutmanager.get_instance().get_entry_path(existing_uid),
+ layoutmanager.get_instance().get_entry_path(existing_uid, existing_vid),
'data')
new_file = os.path.join(
- layoutmanager.get_instance().get_entry_path(new_uid),
+ layoutmanager.get_instance().get_entry_path(new_uid, new_vid),
'data')
logging.debug('removing %r' % new_file)
diff --git a/src/carquinyol/indexstore.py b/src/carquinyol/indexstore.py
index 62aebb4..a814f7a 100644
--- a/src/carquinyol/indexstore.py
+++ b/src/carquinyol/indexstore.py
@@ -26,8 +26,10 @@ from carquinyol.layoutmanager import MAX_QUERY_LIMIT
_VALUE_UID = 0
_VALUE_TIMESTAMP = 1
+_VALUE_VID = 2
_PREFIX_UID = 'Q'
+_PREFIX_VID = 'V'
_PREFIX_ACTIVITY = 'A'
_PREFIX_ACTIVITY_ID = 'I'
_PREFIX_MIME_TYPE = 'M'
@@ -66,17 +68,19 @@ class IndexStore(object):
for f in os.listdir(index_path):
os.remove(os.path.join(index_path, f))
- def contains(self, uid):
- postings = self._database.postlist(_PREFIX_UID + uid)
+ def contains(self, uid, vid):
+ postings = self._database.postlist(_PREFIX_UID + uid + _PREFIX_VID + vid)
try:
postlist_item = postings.next()
except StopIteration:
return False
return True
- def store(self, uid, properties):
+ def store(self, uid, vid, properties):
document = Document()
+ document.add_term("%s%s%s%s" % (_PREFIX_UID, uid, _PREFIX_VID, vid))
document.add_term(_PREFIX_UID + uid)
+ document.add_term(_PREFIX_VID + vid)
document.add_term(_PREFIX_ACTIVITY + properties.get('activity', ''))
document.add_term(_PREFIX_MIME_TYPE + properties.get('mime_type', ''))
document.add_term(_PREFIX_ACTIVITY_ID +
@@ -84,6 +88,7 @@ class IndexStore(object):
document.add_term(_PREFIX_KEEP + str(properties.get('keep', 0)))
document.add_value(_VALUE_UID, uid)
+ document.add_value(_VALUE_VID, vid)
document.add_value(_VALUE_TIMESTAMP, str(properties['timestamp']))
term_generator = xapian.TermGenerator()
@@ -103,10 +108,10 @@ class IndexStore(object):
term_generator.index_text_without_positions(
self._extract_text(properties), 1, '')
- if not self.contains(uid):
+ if not self.contains(uid, vid):
self._database.add_document(document)
else:
- self._database.replace_document(_PREFIX_UID + uid, document)
+ self._database.replace_document("%s%s%s%s" % (_PREFIX_UID, uid, _PREFIX_VID, vid), document)
self._flush()
def _extract_text(self, properties):
@@ -124,10 +129,12 @@ class IndexStore(object):
def find(self, query):
enquire = Enquire(self._database)
- enquire.set_query(self._parse_query(query))
- offset = query.get('offset', 0)
- limit = query.get('limit', MAX_QUERY_LIMIT)
+ offset = query.pop('offset', 0)
+ limit = query.pop('limit', MAX_QUERY_LIMIT)
+ all_versions = query.pop('all_versions', False)
+
+ enquire.set_query(self._parse_query(query))
# This will assure that the results count is exact.
check_at_least = offset + limit + 1
@@ -137,11 +144,25 @@ class IndexStore(object):
query_result = enquire.get_mset(offset, limit, check_at_least)
total_count = query_result.get_matches_estimated()
- uids = []
+ uvids = []
+ timestamps = []
for hit in query_result:
- uids.append(hit.document.get_value(_VALUE_UID))
+ uvids.append((hit.document.get_value(_VALUE_UID), hit.document.get_value(_VALUE_VID)))
- return (uids, total_count)
+ if not all_versions :
+ # only return latest version for each entry
+ # result set is already sorted by time so we only need to take the first entry for each uid
+ uids_vid = {}
+ uvids_new = []
+ for (uid, vid) in uvids :
+ if uid not in uids_vid :
+ uids_vid[uid] = vid
+ uvids_new.append((uid,vid))
+
+ uvids = uvids_new
+
+ # TODO: total_count will be totally off if all_versions is not set
+ return (uvids, total_count)
def _parse_query(self, query_dict):
logging.debug('_parse_query %r' % query_dict)
@@ -174,10 +195,15 @@ class IndexStore(object):
query = Query(Query.OP_VALUE_RANGE, _VALUE_TIMESTAMP, start, end)
queries.append(query)
+ # TODO: refactor / simplify
uid = query_dict.pop('uid', None)
if uid is not None:
queries.append(Query(_PREFIX_UID + uid))
+ vid = query_dict.pop('vid', None)
+ if vid is not None:
+ queries.append(Query(_PREFIX_VID + vid))
+
activity = query_dict.pop('activity', None)
if activity is not None:
queries.append(Query(_PREFIX_ACTIVITY + activity))
@@ -207,8 +233,8 @@ class IndexStore(object):
return Query(Query.OP_AND, queries)
- def delete(self, uid):
- self._database.delete_document(_PREFIX_UID + uid)
+ def delete(self, uid, vid):
+ self._database.delete_document("%s%s%s%s" % (_PREFIX_UID, uid, _PREFIX_VID, vid))
def get_activities(self):
activities = []
diff --git a/src/carquinyol/layoutmanager.py b/src/carquinyol/layoutmanager.py
index dc3fde6..1a75052 100644
--- a/src/carquinyol/layoutmanager.py
+++ b/src/carquinyol/layoutmanager.py
@@ -53,9 +53,9 @@ class LayoutManager(object):
version_path = os.path.join(self._root_path, 'version')
open(version_path, 'w').write(str(version))
- def get_entry_path(self, uid):
+ def get_entry_path(self, uid, vid):
# os.path.join() is just too slow
- return '%s/%s/%s' % (self._root_path, uid[:2], uid)
+ return '%s/%s/%s-%s' % (self._root_path, uid[:2], uid, vid)
def get_root_path(self):
return self._root_path
@@ -85,13 +85,13 @@ class LayoutManager(object):
index_updated = property(_is_index_updated, _set_index_updated)
def find_all(self):
- uids = []
+ uvids = []
for f in os.listdir(self._root_path):
if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2:
for g in os.listdir(os.path.join(self._root_path, f)):
- if len(g) == 36:
- uids.append(g)
- return uids
+ if len(g) == 73:
+ uvids.append((g[:36], g[37:]))
+ return uvids
_instance = None
def get_instance():
diff --git a/src/carquinyol/metadatastore.py b/src/carquinyol/metadatastore.py
index 50981f3..b5bfeb1 100644
--- a/src/carquinyol/metadatastore.py
+++ b/src/carquinyol/metadatastore.py
@@ -6,8 +6,8 @@ from carquinyol import metadatareader
MAX_SIZE = 256
class MetadataStore(object):
- def store(self, uid, metadata):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ def store(self, uid, vid, metadata):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
@@ -19,6 +19,7 @@ class MetadataStore(object):
os.remove(os.path.join(metadata_path, key))
metadata['uid'] = uid
+ metadata['vid'] = vid
for key, value in metadata.items():
# Hack to support activities that still pass properties named as for
@@ -36,19 +37,19 @@ class MetadataStore(object):
finally:
f.close()
- def retrieve(self, uid, properties=None):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ def retrieve(self, uid, vid, properties=None):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
return metadatareader.retrieve(dir_path, properties)
- def delete(self, uid):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ def delete(self, uid, vid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
metadata_path = os.path.join(dir_path, 'metadata')
for key in os.listdir(metadata_path):
os.remove(os.path.join(metadata_path, key))
os.rmdir(metadata_path)
- def get_property(self, uid, key):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ def get_property(self, uid, vid, key):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
metadata_path = os.path.join(dir_path, 'metadata')
property_path = os.path.join(metadata_path, key)
if os.path.exists(property_path):
@@ -56,8 +57,8 @@ class MetadataStore(object):
else:
return None
- def set_property(self, uid, key, value):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ def set_property(self, uid, vid, key, value):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
metadata_path = os.path.join(dir_path, 'metadata')
property_path = os.path.join(metadata_path, key)
open(property_path, 'w').write(value)
diff --git a/src/carquinyol/migration.py b/src/carquinyol/migration.py
index 228db2a..0722e43 100644
--- a/src/carquinyol/migration.py
+++ b/src/carquinyol/migration.py
@@ -22,6 +22,7 @@ import logging
import shutil
import time
import traceback
+import uuid
import cjson
@@ -30,7 +31,7 @@ from carquinyol import layoutmanager
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
def migrate_from_0():
- logging.info('Migrating datastore from version 0 to version 1')
+ logging.info('Migrating datastore from version 0 to version 2')
root_path = layoutmanager.get_instance().get_root_path()
old_root_path = os.path.join(root_path, 'store')
@@ -43,10 +44,11 @@ def migrate_from_0():
continue
logging.debug('Migrating entry %r' % uid)
+ vid = str(uuid.uuid4())
try:
- _migrate_metadata(root_path, old_root_path, uid)
- _migrate_file(root_path, old_root_path, uid)
- _migrate_preview(root_path, old_root_path, uid)
+ _migrate_metadata_0(root_path, old_root_path, uid, vid)
+ _migrate_file_0(root_path, old_root_path, uid, vid)
+ _migrate_preview_0(root_path, old_root_path, uid, vid)
except Exception:
logging.error('Error while migrating entry %r: %s\n' % \
(uid, traceback.format_exc()))
@@ -57,8 +59,8 @@ def migrate_from_0():
logging.info('Migration finished')
-def _migrate_metadata(root_path, old_root_path, uid):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+def _migrate_metadata_0(root_path, old_root_path, uid, vid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
metadata_path = os.path.join(dir_path, 'metadata')
os.makedirs(metadata_path)
@@ -68,6 +70,8 @@ def _migrate_metadata(root_path, old_root_path, uid):
if 'uid' not in metadata:
metadata['uid'] = uid
+ metadata['vid'] = vid
+
if 'timestamp' not in metadata and 'mtime' in metadata:
metadata['timestamp'] = \
time.mktime(time.strptime(metadata['mtime'], DATE_FORMAT))
@@ -88,15 +92,61 @@ def _migrate_metadata(root_path, old_root_path, uid):
'Error while migrating property %s of entry %s: %s\n' % \
(key, uid, traceback.format_exc()))
-def _migrate_file(root_path, old_root_path, uid):
+def _migrate_file_0(root_path, old_root_path, uid, vid):
if os.path.exists(os.path.join(old_root_path, uid)):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
os.rename(os.path.join(old_root_path, uid),
os.path.join(dir_path, 'data'))
-def _migrate_preview(root_path, old_root_path, uid):
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+def _migrate_preview_0(root_path, old_root_path, uid, vid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid, vid)
metadata_path = os.path.join(dir_path, 'metadata')
os.rename(os.path.join(old_root_path, 'preview', uid),
os.path.join(metadata_path, 'preview'))
+
+def migrate_from_1():
+ logging.info('Migrating datastore from version 1 to version 2')
+
+ root_path = layoutmanager.get_instance().get_root_path()
+ checksum_path = os.path.join(root_path, "checksums")
+
+ vids = {}
+ for hash02 in os.listdir(root_path):
+ if len(hash02) != 2 :
+ continue
+
+ for uid in os.listdir(os.path.join(root_path, hash02)) :
+ if (len(uid) != 36) :
+ continue
+
+ logging.debug('Migrating entry %r' % uid)
+
+ vid = str(uuid.uuid4())
+ vids[uid] = vid
+ try:
+ new_path = layoutmanager.get_instance().get_entry_path(uid, vid)
+ os.rename(os.path.join(root_path, hash02, uid),
+ new_path)
+ file(os.path.join(new_path, "metadata", "vid"), "w").write(vid)
+
+ except Exception:
+ logging.error('Error while migrating entry %r: %s\n' % \
+ (uid, traceback.format_exc()))
+
+ for checksum in os.listdir(checksum_path) :
+ entries_path = os.path.join(checksum_path, checksum)
+ for uid in os.listdir(entries_path) :
+ if len(uid) != 36 :
+ continue
+
+ try :
+ os.rename(os.path.join(entries_path, uid),
+ os.path.join(entries_path, "%s-%s" % (uid,vids[uid])))
+
+ except Exception:
+ logging.error('Error while migrating checksum entry %r / %r: %s\n' % \
+ (checksum, uid, traceback.format_exc()))
+
+ logging.info('Migration finished')
+
diff --git a/src/carquinyol/optimizer.py b/src/carquinyol/optimizer.py
index f8a2e3e..c4a0681 100644
--- a/src/carquinyol/optimizer.py
+++ b/src/carquinyol/optimizer.py
@@ -18,6 +18,7 @@ import os
import errno
import logging
import subprocess
+import uuid
import gobject
@@ -31,33 +32,34 @@ class Optimizer(object):
self._metadata_store = metadata_store
self._enqueue_checksum_id = None
- def optimize(self, uid):
+ def optimize(self, uid, vid):
"""Add an entry to a queue of entries to be checked for duplicates.
"""
- if not os.path.exists(self._file_store.get_file_path(uid)):
+ if not os.path.exists(self._file_store.get_file_path(uid, vid)):
return
queue_path = layoutmanager.get_instance().get_queue_path()
- open(os.path.join(queue_path, uid), 'w').close()
- logging.debug('optimize %r' % os.path.join(queue_path, uid))
+ fname = os.path.join(queue_path, "%s-%s" % (uid, vid))
+ open(fname, 'w').close()
+ logging.debug('optimize %r' % fname)
if self._enqueue_checksum_id is None:
self._enqueue_checksum_id = \
gobject.idle_add(self._process_entry_cb,
priority=gobject.PRIORITY_LOW)
- def remove(self, uid):
+ def remove(self, uid, vid):
"""Remove any structures left from space optimization
"""
- checksum = self._metadata_store.get_property(uid, 'checksum')
+ checksum = self._metadata_store.get_property(uid, vid, 'checksum')
if checksum is None:
return
checksums_dir = layoutmanager.get_instance().get_checksums_dir()
checksum_path = os.path.join(checksums_dir, checksum)
- checksum_entry_path = os.path.join(checksum_path, uid)
+ checksum_entry_path = os.path.join(checksum_path, "%s-%s" % (uid,vid))
if os.path.exists(checksum_entry_path):
logging.debug('remove %r' % checksum_entry_path)
@@ -79,14 +81,14 @@ class Optimizer(object):
checksum_path = os.path.join(checksums_dir, checksum)
return os.path.exists(checksum_path)
- def _get_uid_from_checksum(self, checksum):
+ def _get_uvid_from_checksum(self, checksum):
"""Get an existing entry which file matches checksum.
"""
checksums_dir = layoutmanager.get_instance().get_checksums_dir()
checksum_path = os.path.join(checksums_dir, checksum)
- first_uid = os.listdir(checksum_path)[0]
- return first_uid
+ first_uvid = os.listdir(checksum_path)[0]
+ return (first_uvid[:36], first_uvid[37:])
def _create_checksum_dir(self, checksum):
"""Create directory that tracks files with this same checksum.
@@ -97,24 +99,25 @@ class Optimizer(object):
logging.debug('create dir %r' % checksum_path)
os.mkdir(checksum_path)
- def _add_checksum_entry(self, uid, checksum):
- """Create a file in the checksum dir with the uid of the entry
+ def _add_checksum_entry(self, uid, vid, checksum):
+ """Create a file in the checksum dir with the uid and vid of the entry
"""
checksums_dir = layoutmanager.get_instance().get_checksums_dir()
checksum_path = os.path.join(checksums_dir, checksum)
- logging.debug('touch %r' % os.path.join(checksum_path, uid))
- open(os.path.join(checksum_path, uid), 'w').close()
+ fname = os.path.join(checksum_path, "%s-%s" % (uid,vid))
+ logging.debug('touch %r' % fname)
+ open(fname, 'w').close()
- def _already_linked(self, uid, checksum):
+ def _already_linked(self, uid, vid, checksum):
"""Check if this entry's file is already a hard link to the checksums
dir.
"""
checksums_dir = layoutmanager.get_instance().get_checksums_dir()
checksum_path = os.path.join(checksums_dir, checksum)
- return os.path.exists(os.path.join(checksum_path, uid))
+ return os.path.exists(os.path.join(checksum_path, "%s-%s" % (uid,vid)))
def _process_entry_cb(self):
"""Process one item in the checksums queue by calculating its checksum,
@@ -125,30 +128,30 @@ class Optimizer(object):
queue_path = layoutmanager.get_instance().get_queue_path()
queue = os.listdir(queue_path)
if queue:
- uid = queue[0]
- logging.debug('_process_entry_cb processing %r' % uid)
+ (uid,vid) = queue[0][:36], queue[0][37:]
+ logging.debug('_process_entry_cb processing (%r,%r)' % (uid,vid))
- file_in_entry_path = self._file_store.get_file_path(uid)
+ file_in_entry_path = self._file_store.get_file_path(uid,vid)
if not os.path.exists(file_in_entry_path):
- logging.info('non-existent entry in queue: %r' % uid)
+ logging.info('non-existent entry in queue: (%r,%r)' % (uid,vid))
else:
checksum = self._calculate_md5sum(file_in_entry_path)
- self._metadata_store.set_property(uid, 'checksum', checksum)
+ self._metadata_store.set_property(uid, vid, 'checksum', checksum)
if self._identical_file_already_exists(checksum):
- if not self._already_linked(uid, checksum):
- existing_entry_uid = \
- self._get_uid_from_checksum(checksum)
+ if not self._already_linked(uid, vid, checksum):
+ existing_entry_uvid = \
+ self._get_uvid_from_checksum(checksum)
- self._file_store.hard_link_entry(uid,
- existing_entry_uid)
+ self._file_store.hard_link_entry(uid, vid,
+ *existing_entry_uvid)
- self._add_checksum_entry(uid, checksum)
+ self._add_checksum_entry(uid, vid, checksum)
else:
self._create_checksum_dir(checksum)
- self._add_checksum_entry(uid, checksum)
+ self._add_checksum_entry(uid, vid, checksum)
- os.remove(os.path.join(queue_path, uid))
+ os.remove(os.path.join(queue_path, "%s-%s" % (uid,vid)))
if len(queue) <= 1:
self._enqueue_checksum_id = None