Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/db/directory.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/db/directory.py')
-rw-r--r--sugar_network/db/directory.py263
1 files changed, 62 insertions, 201 deletions
diff --git a/sugar_network/db/directory.py b/sugar_network/db/directory.py
index 944f73a..c6957d7 100644
--- a/sugar_network/db/directory.py
+++ b/sugar_network/db/directory.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2011-2013 Aleksey Lim
+# Copyright (C) 2011-2014 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -16,15 +16,12 @@
import os
import shutil
import logging
-from cStringIO import StringIO
from os.path import exists, join
from sugar_network import toolkit
-from sugar_network.toolkit.router import ACL
from sugar_network.db.storage import Storage
-from sugar_network.db.metadata import BlobProperty, Metadata, GUID_PREFIX
-from sugar_network.db.metadata import IndexedProperty, StoredProperty
-from sugar_network.db.metadata import AggregatedType
+from sugar_network.db.metadata import Metadata, Guid
+from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import http, exception, enforce
@@ -36,8 +33,7 @@ _logger = logging.getLogger('db.directory')
class Directory(object):
- def __init__(self, root, resource_class, index_class,
- broadcast=None, seqno=None):
+ def __init__(self, root, resource_class, index_class, seqno=None):
"""
:param index_class:
what class to use to access to indexes, for regular casses
@@ -51,12 +47,10 @@ class Directory(object):
if resource_class.metadata is None:
# Metadata cannot be recreated
resource_class.metadata = Metadata(resource_class)
- resource_class.metadata['guid'] = IndexedProperty('guid',
- slot=0, prefix=GUID_PREFIX, acl=ACL.CREATE | ACL.READ)
+ resource_class.metadata['guid'] = Guid()
self.metadata = resource_class.metadata
self.resource_class = resource_class
- self.broadcast = broadcast or (lambda event: None)
self._index_class = index_class
self._root = root
self._seqno = _SessionSeqno() if seqno is None else seqno
@@ -65,25 +59,6 @@ class Directory(object):
self._open()
- @property
- def mtime(self):
- return self._index.mtime
-
- def checkpoint(self):
- ts = self._index.checkpoint()
- self.broadcast({'event': 'populate', 'mtime': ts})
-
- def path(self, guid, *args):
- record = self._storage.get(guid)
- if not args:
- return record.path()
- prop = args[0]
- if prop in self.metadata and \
- isinstance(self.metadata[prop], BlobProperty):
- return record.blob_path(*args)
- else:
- return record.path(*args)
-
def wipe(self):
self.close()
_logger.debug('Wipe %r directory', self.metadata.name)
@@ -102,7 +77,7 @@ class Directory(object):
"""Flush pending chnages to disk."""
self._index.commit()
- def create(self, props, event=None, setters=False):
+ def create(self, props):
"""Create new document.
If `guid` property is not specified, it will be auto set.
@@ -116,24 +91,12 @@ class Directory(object):
guid = props.get('guid')
if not guid:
guid = props['guid'] = toolkit.uuid()
- if setters:
- # XXX Setters are being proccessed on routes level, but,
- # while creating resources gotten from routes, it is important
- # to call setters as well, e.g., `author` property
- doc = self.resource_class(guid, None, props)
- for key, value in props.items():
- prop = self.metadata.get(key)
- if prop is not None and prop.on_set is not None:
- props[key] = prop.on_set(doc, value)
_logger.debug('Create %s[%s]: %r', self.metadata.name, guid, props)
- post_event = {'event': 'create', 'guid': guid}
- if event:
- post_event.update(event)
- self._index.store(guid, props, self._pre_store, self._post_store,
- post_event)
+ event = {'event': 'create', 'guid': guid}
+ self._index.store(guid, props, self._prestore, self._broadcast, event)
return guid
- def update(self, guid, props, event=None):
+ def update(self, guid, props):
"""Update properties for an existing document.
:param guid:
@@ -143,11 +106,8 @@ class Directory(object):
"""
_logger.debug('Update %s[%s]: %r', self.metadata.name, guid, props)
- post_event = {'event': 'update', 'guid': guid}
- if event:
- post_event.update(event)
- self._index.store(guid, props, self._pre_store, self._post_store,
- post_event)
+ event = {'event': 'update', 'guid': guid}
+ self._index.store(guid, props, self._prestore, self._broadcast, event)
def delete(self, guid):
"""Delete document.
@@ -158,7 +118,7 @@ class Directory(object):
"""
_logger.debug('Delete %s[%s]', self.metadata.name, guid)
event = {'event': 'delete', 'guid': guid}
- self._index.delete(guid, self._post_delete, event)
+ self._index.delete(guid, self._postdelete, guid, event)
def exists(self, guid):
return self._storage.get(guid).consistent
@@ -171,6 +131,9 @@ class Directory(object):
guid, self.metadata.name)
return self.resource_class(guid, record, cached_props)
+ def __getitem__(self, guid):
+ return self.get(guid)
+
def find(self, **kwargs):
mset = self._index.find(**kwargs)
@@ -195,9 +158,9 @@ class Directory(object):
"""
found = False
- migrate = (self.mtime == 0)
+ migrate = (self._index.mtime == 0)
- for guid in self._storage.walk(self.mtime):
+ for guid in self._storage.walk(self._index.mtime):
if not found:
_logger.info('Start populating %r index', self.metadata.name)
found = True
@@ -208,9 +171,7 @@ class Directory(object):
record = self._storage.get(guid)
try:
props = {}
- for name, prop in self.metadata.items():
- if not isinstance(prop, StoredProperty):
- continue
+ for name in self.metadata:
meta = record.get(name)
if meta is not None:
props[name] = meta['value']
@@ -224,33 +185,11 @@ class Directory(object):
if found:
self._save_layout()
self.commit()
- self.checkpoint()
-
- def patch(self, guid, props, accept_language=None):
- if not accept_language:
- accept_language = toolkit.default_lang()
- orig = self.get(guid)
- patch = {}
- for prop, value in (props or {}).items():
- if orig[prop] == value:
- continue
- if isinstance(self.metadata[prop], StoredProperty) and \
- self.metadata[prop].localized:
- if isinstance(value, dict):
- if value == dict([(i, orig[prop].get(i)) for i in value]):
- continue
- elif orig.get(prop, accept_language) == value:
- continue
- elif isinstance(self.metadata[prop], BlobProperty) and \
- isinstance(value, dict) and \
- value.get('digest') == orig[prop].get('digest'):
- continue
- patch[prop] = value
- return patch
def diff(self, seq, exclude_seq=None, **params):
- if exclude_seq is None:
- exclude_seq = []
+ if exclude_seq is not None:
+ for start, end in exclude_seq:
+ seq.exclude(start, end)
if 'group_by' in params:
# Pickup only most recent change
params['order_by'] = '-seqno'
@@ -263,82 +202,30 @@ class Directory(object):
if end:
query += str(end)
documents, __ = self.find(query=query, **params)
-
for doc in documents:
+ yield doc.guid, doc.diff(seq)
- def patch():
- for name, prop in self.metadata.items():
- if name == 'seqno' or prop.acl & ACL.CALC:
- continue
- meta = doc.meta(name)
- if meta is None:
- continue
- seqno = meta.get('seqno')
- if seqno not in seq or seqno in exclude_seq:
- continue
- if isinstance(prop, BlobProperty):
- del meta['seqno']
- else:
- value = meta.get('value')
- if prop.typecast is AggregatedType:
- value_ = {}
- for key, agg in value.items():
- aggseqno = agg.pop('seqno')
- if aggseqno >= start and \
- (not end or aggseqno <= end):
- value_[key] = agg
- value = value_
- meta = {'mtime': meta['mtime'], 'value': value}
- yield name, meta, seqno
-
- yield doc.guid, patch()
-
- def merge(self, guid, diff, shift_seqno=True, op=None, **kwargs):
+ def merge(self, guid, diff):
"""Apply changes for documents."""
- record = self._storage.get(guid)
- seqno = None
- merge = {}
- patch = {}
+ doc = self.resource_class(guid, self._storage.get(guid))
for prop, meta in diff.items():
- orig_meta = record.get(prop)
- if orig_meta is not None and orig_meta['mtime'] >= meta['mtime']:
+ orig_meta = doc.meta(prop)
+ if orig_meta and orig_meta['mtime'] >= meta['mtime']:
continue
- if shift_seqno:
- if not seqno:
- seqno = self._seqno.next()
- meta['seqno'] = seqno
- else:
- meta['seqno'] = (orig_meta or {}).get('seqno') or 0
- meta.update(kwargs)
- if self.metadata.get(prop).typecast is AggregatedType:
- for agg in meta['value'].values():
- agg['seqno'] = meta['seqno']
- if orig_meta:
- orig_meta['value'].update(meta['value'])
- meta['value'] = orig_meta['value']
- merge[prop] = meta
- if op is not None:
- patch[prop] = meta.get('value')
-
- if not merge:
- return seqno, False
-
- if op is not None:
- op(patch)
- for prop, meta in merge.items():
- is_blob = isinstance(self.metadata.get(prop), BlobProperty)
- record.set(prop, cleanup_blob=is_blob, **meta)
-
- if record.consistent:
- props = {}
- if seqno:
- props['seqno'] = seqno
+ if doc.post_seqno is None:
+ doc.post_seqno = self._seqno.next()
+ doc.post(prop, **meta)
+
+ if doc.post_seqno is None:
+ return None, False
+
+ if doc.exists:
# No need in after-merge event, further commit event
- # is enough to avoid events flow on nodes synchronization
- self._index.store(guid, props, self._pre_store, self._post_store)
+ # is enough to avoid increasing events flow
+ self._index.store(guid, doc.props, self._preindex)
- return seqno, True
+ return doc.post_seqno, True
def _open(self):
if not exists(self._root):
@@ -352,63 +239,37 @@ class Directory(object):
self._save_layout()
self._storage = Storage(self._root, self.metadata)
self._index = self._index_class(index_path, self.metadata,
- self._post_commit)
+ self._postcommit)
_logger.debug('Open %r resource', self.resource_class)
- def _pre_store(self, guid, changes, event=None):
- seqno = changes.get('seqno')
- if event is not None and not seqno:
- seqno = changes['seqno'] = self._seqno.next()
+ def _broadcast(self, event):
+ event['resource'] = self.metadata.name
+ this.broadcast(event)
+
+ def _preindex(self, guid, changes):
+ doc = self.resource_class(guid, self._storage.get(guid), changes)
+ for prop in self.metadata:
+ enforce(doc[prop] is not None, 'Empty %r property', prop)
+ return doc.props
+
+ def _prestore(self, guid, changes, event):
+ doc = self.resource_class(guid, self._storage.get(guid))
+ doc.post_seqno = self._seqno.next()
+ for prop in self.metadata.keys():
+ value = changes.get(prop)
+ if value is None:
+ enforce(doc[prop] is not None, 'Empty %r property', prop)
+ else:
+ doc.post(prop, value)
+ return doc.props
- record = self._storage.get(guid)
- existed = record.exists
-
- for name, prop in self.metadata.items():
- value = changes.get(name)
- if isinstance(prop, BlobProperty):
- if isinstance(value, dict):
- record.set(name, seqno=seqno, cleanup_blob=True, **value)
- elif isinstance(value, basestring):
- record.set(name, seqno=seqno, blob=StringIO(value))
- elif isinstance(prop, StoredProperty):
- if value is None:
- enforce(existed or prop.default is not None,
- 'Value is not specified for %r property', name)
- meta = record.get(name)
- if meta is not None:
- value = meta['value']
- changes[name] = prop.default if value is None else value
- else:
- if prop.typecast is AggregatedType:
- for aggvalue in value.values():
- aggvalue['seqno'] = seqno
- if existed:
- value_ = record.get(name)['value']
- value_.update(value)
- value = value_
- elif prop.localized:
- if not isinstance(value, dict):
- value = {toolkit.default_lang(): value}
- if existed and \
- type(value) is dict: # TODO To reset `value`
- meta = record.get(name)
- if meta is not None:
- meta['value'].update(value)
- value = meta['value']
- changes[name] = value
- record.set(name, value=value, seqno=seqno)
-
- def _post_store(self, guid, changes, event=None):
- if event is not None:
- self.broadcast(event)
-
- def _post_delete(self, guid, event):
+ def _postdelete(self, guid, event):
self._storage.delete(guid)
- self.broadcast(event)
+ self._broadcast(event)
- def _post_commit(self):
+ def _postcommit(self):
self._seqno.commit()
- self.broadcast({'event': 'commit', 'mtime': self.mtime})
+ self._broadcast({'event': 'commit', 'mtime': self._index.mtime})
def _save_layout(self):
path = join(self._root, 'layout')