diff options
Diffstat (limited to 'sugar_network/db/directory.py')
-rw-r--r-- | sugar_network/db/directory.py | 263 |
1 files changed, 62 insertions, 201 deletions
diff --git a/sugar_network/db/directory.py b/sugar_network/db/directory.py index 944f73a..c6957d7 100644 --- a/sugar_network/db/directory.py +++ b/sugar_network/db/directory.py @@ -1,4 +1,4 @@ -# Copyright (C) 2011-2013 Aleksey Lim +# Copyright (C) 2011-2014 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -16,15 +16,12 @@ import os import shutil import logging -from cStringIO import StringIO from os.path import exists, join from sugar_network import toolkit -from sugar_network.toolkit.router import ACL from sugar_network.db.storage import Storage -from sugar_network.db.metadata import BlobProperty, Metadata, GUID_PREFIX -from sugar_network.db.metadata import IndexedProperty, StoredProperty -from sugar_network.db.metadata import AggregatedType +from sugar_network.db.metadata import Metadata, Guid +from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import http, exception, enforce @@ -36,8 +33,7 @@ _logger = logging.getLogger('db.directory') class Directory(object): - def __init__(self, root, resource_class, index_class, - broadcast=None, seqno=None): + def __init__(self, root, resource_class, index_class, seqno=None): """ :param index_class: what class to use to access to indexes, for regular casses @@ -51,12 +47,10 @@ class Directory(object): if resource_class.metadata is None: # Metadata cannot be recreated resource_class.metadata = Metadata(resource_class) - resource_class.metadata['guid'] = IndexedProperty('guid', - slot=0, prefix=GUID_PREFIX, acl=ACL.CREATE | ACL.READ) + resource_class.metadata['guid'] = Guid() self.metadata = resource_class.metadata self.resource_class = resource_class - self.broadcast = broadcast or (lambda event: None) self._index_class = index_class self._root = root self._seqno = _SessionSeqno() if seqno is None else seqno @@ -65,25 +59,6 @@ class Directory(object): self._open() - @property - def mtime(self): - return self._index.mtime - - def checkpoint(self): - ts = self._index.checkpoint() - self.broadcast({'event': 'populate', 'mtime': ts}) - - def path(self, guid, *args): - record = self._storage.get(guid) - if not args: - return record.path() - prop = args[0] - if prop in self.metadata and \ - isinstance(self.metadata[prop], BlobProperty): - return record.blob_path(*args) - else: - return record.path(*args) - def wipe(self): self.close() _logger.debug('Wipe %r directory', self.metadata.name) @@ -102,7 +77,7 @@ class Directory(object): """Flush pending chnages to disk.""" self._index.commit() - def create(self, props, event=None, setters=False): + def create(self, props): """Create new document. If `guid` property is not specified, it will be auto set. @@ -116,24 +91,12 @@ class Directory(object): guid = props.get('guid') if not guid: guid = props['guid'] = toolkit.uuid() - if setters: - # XXX Setters are being proccessed on routes level, but, - # while creating resources gotten from routes, it is important - # to call setters as well, e.g., `author` property - doc = self.resource_class(guid, None, props) - for key, value in props.items(): - prop = self.metadata.get(key) - if prop is not None and prop.on_set is not None: - props[key] = prop.on_set(doc, value) _logger.debug('Create %s[%s]: %r', self.metadata.name, guid, props) - post_event = {'event': 'create', 'guid': guid} - if event: - post_event.update(event) - self._index.store(guid, props, self._pre_store, self._post_store, - post_event) + event = {'event': 'create', 'guid': guid} + self._index.store(guid, props, self._prestore, self._broadcast, event) return guid - def update(self, guid, props, event=None): + def update(self, guid, props): """Update properties for an existing document. :param guid: @@ -143,11 +106,8 @@ class Directory(object): """ _logger.debug('Update %s[%s]: %r', self.metadata.name, guid, props) - post_event = {'event': 'update', 'guid': guid} - if event: - post_event.update(event) - self._index.store(guid, props, self._pre_store, self._post_store, - post_event) + event = {'event': 'update', 'guid': guid} + self._index.store(guid, props, self._prestore, self._broadcast, event) def delete(self, guid): """Delete document. @@ -158,7 +118,7 @@ class Directory(object): """ _logger.debug('Delete %s[%s]', self.metadata.name, guid) event = {'event': 'delete', 'guid': guid} - self._index.delete(guid, self._post_delete, event) + self._index.delete(guid, self._postdelete, guid, event) def exists(self, guid): return self._storage.get(guid).consistent @@ -171,6 +131,9 @@ class Directory(object): guid, self.metadata.name) return self.resource_class(guid, record, cached_props) + def __getitem__(self, guid): + return self.get(guid) + def find(self, **kwargs): mset = self._index.find(**kwargs) @@ -195,9 +158,9 @@ class Directory(object): """ found = False - migrate = (self.mtime == 0) + migrate = (self._index.mtime == 0) - for guid in self._storage.walk(self.mtime): + for guid in self._storage.walk(self._index.mtime): if not found: _logger.info('Start populating %r index', self.metadata.name) found = True @@ -208,9 +171,7 @@ class Directory(object): record = self._storage.get(guid) try: props = {} - for name, prop in self.metadata.items(): - if not isinstance(prop, StoredProperty): - continue + for name in self.metadata: meta = record.get(name) if meta is not None: props[name] = meta['value'] @@ -224,33 +185,11 @@ class Directory(object): if found: self._save_layout() self.commit() - self.checkpoint() - - def patch(self, guid, props, accept_language=None): - if not accept_language: - accept_language = toolkit.default_lang() - orig = self.get(guid) - patch = {} - for prop, value in (props or {}).items(): - if orig[prop] == value: - continue - if isinstance(self.metadata[prop], StoredProperty) and \ - self.metadata[prop].localized: - if isinstance(value, dict): - if value == dict([(i, orig[prop].get(i)) for i in value]): - continue - elif orig.get(prop, accept_language) == value: - continue - elif isinstance(self.metadata[prop], BlobProperty) and \ - isinstance(value, dict) and \ - value.get('digest') == orig[prop].get('digest'): - continue - patch[prop] = value - return patch def diff(self, seq, exclude_seq=None, **params): - if exclude_seq is None: - exclude_seq = [] + if exclude_seq is not None: + for start, end in exclude_seq: + seq.exclude(start, end) if 'group_by' in params: # Pickup only most recent change params['order_by'] = '-seqno' @@ -263,82 +202,30 @@ class Directory(object): if end: query += str(end) documents, __ = self.find(query=query, **params) - for doc in documents: + yield doc.guid, doc.diff(seq) - def patch(): - for name, prop in self.metadata.items(): - if name == 'seqno' or prop.acl & ACL.CALC: - continue - meta = doc.meta(name) - if meta is None: - continue - seqno = meta.get('seqno') - if seqno not in seq or seqno in exclude_seq: - continue - if isinstance(prop, BlobProperty): - del meta['seqno'] - else: - value = meta.get('value') - if prop.typecast is AggregatedType: - value_ = {} - for key, agg in value.items(): - aggseqno = agg.pop('seqno') - if aggseqno >= start and \ - (not end or aggseqno <= end): - value_[key] = agg - value = value_ - meta = {'mtime': meta['mtime'], 'value': value} - yield name, meta, seqno - - yield doc.guid, patch() - - def merge(self, guid, diff, shift_seqno=True, op=None, **kwargs): + def merge(self, guid, diff): """Apply changes for documents.""" - record = self._storage.get(guid) - seqno = None - merge = {} - patch = {} + doc = self.resource_class(guid, self._storage.get(guid)) for prop, meta in diff.items(): - orig_meta = record.get(prop) - if orig_meta is not None and orig_meta['mtime'] >= meta['mtime']: + orig_meta = doc.meta(prop) + if orig_meta and orig_meta['mtime'] >= meta['mtime']: continue - if shift_seqno: - if not seqno: - seqno = self._seqno.next() - meta['seqno'] = seqno - else: - meta['seqno'] = (orig_meta or {}).get('seqno') or 0 - meta.update(kwargs) - if self.metadata.get(prop).typecast is AggregatedType: - for agg in meta['value'].values(): - agg['seqno'] = meta['seqno'] - if orig_meta: - orig_meta['value'].update(meta['value']) - meta['value'] = orig_meta['value'] - merge[prop] = meta - if op is not None: - patch[prop] = meta.get('value') - - if not merge: - return seqno, False - - if op is not None: - op(patch) - for prop, meta in merge.items(): - is_blob = isinstance(self.metadata.get(prop), BlobProperty) - record.set(prop, cleanup_blob=is_blob, **meta) - - if record.consistent: - props = {} - if seqno: - props['seqno'] = seqno + if doc.post_seqno is None: + doc.post_seqno = self._seqno.next() + doc.post(prop, **meta) + + if doc.post_seqno is None: + return None, False + + if doc.exists: # No need in after-merge event, further commit event - # is enough to avoid events flow on nodes synchronization - self._index.store(guid, props, self._pre_store, self._post_store) + # is enough to avoid increasing events flow + self._index.store(guid, doc.props, self._preindex) - return seqno, True + return doc.post_seqno, True def _open(self): if not exists(self._root): @@ -352,63 +239,37 @@ class Directory(object): self._save_layout() self._storage = Storage(self._root, self.metadata) self._index = self._index_class(index_path, self.metadata, - self._post_commit) + self._postcommit) _logger.debug('Open %r resource', self.resource_class) - def _pre_store(self, guid, changes, event=None): - seqno = changes.get('seqno') - if event is not None and not seqno: - seqno = changes['seqno'] = self._seqno.next() + def _broadcast(self, event): + event['resource'] = self.metadata.name + this.broadcast(event) + + def _preindex(self, guid, changes): + doc = self.resource_class(guid, self._storage.get(guid), changes) + for prop in self.metadata: + enforce(doc[prop] is not None, 'Empty %r property', prop) + return doc.props + + def _prestore(self, guid, changes, event): + doc = self.resource_class(guid, self._storage.get(guid)) + doc.post_seqno = self._seqno.next() + for prop in self.metadata.keys(): + value = changes.get(prop) + if value is None: + enforce(doc[prop] is not None, 'Empty %r property', prop) + else: + doc.post(prop, value) + return doc.props - record = self._storage.get(guid) - existed = record.exists - - for name, prop in self.metadata.items(): - value = changes.get(name) - if isinstance(prop, BlobProperty): - if isinstance(value, dict): - record.set(name, seqno=seqno, cleanup_blob=True, **value) - elif isinstance(value, basestring): - record.set(name, seqno=seqno, blob=StringIO(value)) - elif isinstance(prop, StoredProperty): - if value is None: - enforce(existed or prop.default is not None, - 'Value is not specified for %r property', name) - meta = record.get(name) - if meta is not None: - value = meta['value'] - changes[name] = prop.default if value is None else value - else: - if prop.typecast is AggregatedType: - for aggvalue in value.values(): - aggvalue['seqno'] = seqno - if existed: - value_ = record.get(name)['value'] - value_.update(value) - value = value_ - elif prop.localized: - if not isinstance(value, dict): - value = {toolkit.default_lang(): value} - if existed and \ - type(value) is dict: # TODO To reset `value` - meta = record.get(name) - if meta is not None: - meta['value'].update(value) - value = meta['value'] - changes[name] = value - record.set(name, value=value, seqno=seqno) - - def _post_store(self, guid, changes, event=None): - if event is not None: - self.broadcast(event) - - def _post_delete(self, guid, event): + def _postdelete(self, guid, event): self._storage.delete(guid) - self.broadcast(event) + self._broadcast(event) - def _post_commit(self): + def _postcommit(self): self._seqno.commit() - self.broadcast({'event': 'commit', 'mtime': self.mtime}) + self._broadcast({'event': 'commit', 'mtime': self._index.mtime}) def _save_layout(self): path = join(self._root, 'layout') |