Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/document/volume.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/document/volume.py')
-rw-r--r--sugar_network/document/volume.py288
1 files changed, 288 insertions, 0 deletions
diff --git a/sugar_network/document/volume.py b/sugar_network/document/volume.py
new file mode 100644
index 0000000..cd9132c
--- /dev/null
+++ b/sugar_network/document/volume.py
@@ -0,0 +1,288 @@
+# Copyright (C) 2011-2012 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import time
+import logging
+from contextlib import contextmanager
+from os.path import exists, join, abspath
+
+from active_document import env
+from active_document.directory import Directory
+from active_document.index import IndexWriter
+from active_document.commands import document_command, directory_command
+from active_document.commands import CommandsProcessor, property_command
+from active_document.commands import to_int, to_list
+from active_document.metadata import BlobProperty, StoredProperty, PropertyMeta
+from active_toolkit import coroutine, util, enforce
+
+
+_logger = logging.getLogger('active_document.volume')
+
+
+class _Volume(dict):
+
+ def __init__(self, root, documents, index_class, lazy_open):
+ self._root = abspath(root)
+ _logger.info('Opening %r volume', self._root)
+
+ if not exists(root):
+ os.makedirs(root)
+ self._index_class = index_class
+ self._subscriptions = {}
+ self._to_open = {}
+ self.seqno = env.Seqno(join(self._root, 'seqno'))
+
+ for document in documents:
+ if isinstance(document, basestring):
+ name = document.split('.')[-1]
+ else:
+ name = document.__name__.lower()
+ if lazy_open:
+ self._to_open[name] = document
+ else:
+ self[name] = self._open(name, document)
+
+ @property
+ def root(self):
+ return self._root
+
+ def close(self):
+ """Close operations with the server."""
+ _logger.info('Closing documents in %r', self._root)
+
+ while self:
+ __, cls = self.popitem()
+ cls.close()
+
+ def connect(self, callback, condition=None):
+ self._subscriptions[callback] = condition or {}
+
+ def disconnect(self, callback):
+ if callback in self._subscriptions:
+ del self._subscriptions[callback]
+
+ def populate(self):
+ for cls in self.values():
+ for __ in cls.populate():
+ coroutine.dispatch()
+
+ def notify(self, event):
+ for callback, condition in self._subscriptions.items():
+ for key, value in condition.items():
+ if event.get(key) not in ('*', value):
+ break
+ else:
+ try:
+ callback(event)
+ except Exception:
+ util.exception(_logger, 'Failed to dispatch %r', event)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.close()
+
+ def __getitem__(self, name):
+ directory = self.get(name)
+ if directory is None:
+ enforce(name in self._to_open, env.BadRequest,
+ 'Unknown %r document', name)
+ directory = self[name] = self._open(name, self._to_open.pop(name))
+ return directory
+
+ def _open(self, name, document):
+ if isinstance(document, basestring):
+ mod = __import__(document, fromlist=[name])
+ cls = getattr(mod, name.capitalize())
+ else:
+ cls = document
+ directory = Directory(join(self._root, name), cls, self._index_class,
+ self.notify, self.seqno)
+ return directory
+
+
+class SingleVolume(_Volume):
+
+ def __init__(self, root, document_classes, lazy_open=False):
+ enforce(env.index_write_queue.value > 0,
+ 'The active_document.index_write_queue.value should be > 0')
+ _Volume.__init__(self, root, document_classes, IndexWriter, lazy_open)
+
+
+class VolumeCommands(CommandsProcessor):
+
+ def __init__(self, volume):
+ CommandsProcessor.__init__(self, volume)
+ self.volume = volume
+
+ @directory_command(method='POST',
+ permissions=env.ACCESS_AUTH, mime_type='application/json')
+ def create(self, request):
+ with self._post(request, env.ACCESS_CREATE) as (directory, doc):
+ enforce('guid' not in doc.props, env.Forbidden,
+ "Property 'guid' cannot be set manually")
+ self.before_create(request, doc.props)
+ for prop in directory.metadata.values():
+ if prop.on_set is not None and \
+ not prop.permissions & env.ACCESS_CREATE:
+ doc[prop.name] = prop.on_set(doc, prop.default)
+ doc.guid = directory.create(doc.props)
+ return doc.guid
+
+ @directory_command(method='GET',
+ arguments={'offset': to_int, 'limit': to_int, 'reply': to_list},
+ mime_type='application/json')
+ def find(self, document, reply, request):
+ if not reply:
+ request['reply'] = ['guid']
+ self._preget(request)
+ documents, total = self.volume[document].find(**request)
+ result = [self._get_props(i, request) for i in documents]
+ return {'total': total, 'result': result}
+
+ @document_command(method='GET', cmd='exists',
+ mime_type='application/json')
+ def exists(self, document, guid):
+ directory = self.volume[document]
+ return directory.exists(guid)
+
+ @document_command(method='PUT',
+ permissions=env.ACCESS_AUTH | env.ACCESS_AUTHOR)
+ def update(self, request):
+ with self._post(request, env.ACCESS_WRITE) as (directory, doc):
+ modified = bool(doc.props)
+ self.before_update(request, doc.props)
+ if modified:
+ directory.update(doc.guid, doc.props)
+
+ @property_command(method='PUT',
+ permissions=env.ACCESS_AUTH | env.ACCESS_AUTHOR)
+ def update_prop(self, request, prop, url=None):
+ if url:
+ value = PropertyMeta(url=url)
+ elif request.content is None:
+ value = request.content_stream
+ else:
+ value = request.content
+ request.content = {prop: value}
+ self.update(request)
+
+ @document_command(method='DELETE',
+ permissions=env.ACCESS_AUTH | env.ACCESS_AUTHOR)
+ def delete(self, document, guid):
+ directory = self.volume[document]
+ directory.delete(guid)
+
+ @document_command(method='GET', arguments={'reply': to_list},
+ mime_type='application/json')
+ def get(self, document, guid, request):
+ self._preget(request)
+ doc = self.volume[document].get(guid)
+ return self._get_props(doc, request)
+
+ @property_command(method='GET', mime_type='application/json')
+ def get_prop(self, document, guid, prop, request, response, part=None):
+ directory = self.volume[document]
+ prop = directory.metadata[prop]
+ doc = directory.get(guid)
+ doc.request = request
+
+ prop.assert_access(env.ACCESS_READ)
+
+ if isinstance(prop, StoredProperty):
+ value = doc.get(prop.name, request.accept_language)
+ value = prop.on_get(doc, value)
+ if value is None:
+ value = prop.default
+ return value
+ else:
+ meta = prop.on_get(doc, doc.meta(prop.name))
+ enforce(meta is not None and ('path' in meta or 'url' in meta),
+ env.NotFound, 'BLOB does not exist')
+ return meta
+
+ def before_create(self, request, props):
+ ts = int(time.time())
+ props['ctime'] = ts
+ props['mtime'] = ts
+
+ def before_update(self, request, props):
+ props['mtime'] = int(time.time())
+
+ @contextmanager
+ def _post(self, request, access):
+ enforce(isinstance(request.content, dict), 'Invalid value')
+
+ directory = self.volume[request['document']]
+ if 'guid' in request:
+ doc = directory.get(request['guid'])
+ else:
+ doc = directory.document_class(None, {})
+ doc.request = request
+ blobs = []
+
+ for name, value in request.content.items():
+ prop = directory.metadata[name]
+ if isinstance(prop, BlobProperty) and access == env.ACCESS_WRITE:
+ if doc.meta(name) is None:
+ prop.assert_access(env.ACCESS_CREATE)
+ else:
+ prop.assert_access(env.ACCESS_WRITE)
+ else:
+ prop.assert_access(access)
+ if prop.on_set is not None:
+ value = prop.on_set(doc, value)
+ if isinstance(prop, BlobProperty):
+ enforce(PropertyMeta.is_blob(value), 'Invalid BLOB value')
+ blobs.append((name, value))
+ else:
+ if prop.localized and isinstance(value, basestring):
+ value = {request.accept_language[0]: value}
+ try:
+ doc.props[name] = prop.decode(value)
+ except Exception, error:
+ error = 'Value %r for %r property is invalid: %s' % \
+ (value, prop.name, error)
+ util.exception(error)
+ raise RuntimeError(error)
+
+ yield directory, doc
+
+ for name, value in blobs:
+ directory.set_blob(doc.guid, name, value,
+ mime_type=request.content_type)
+
+ def _preget(self, request):
+ metadata = self.volume[request['document']].metadata
+ reply = request.setdefault('reply', [])
+ if reply:
+ for prop in reply:
+ metadata[prop].assert_access(env.ACCESS_READ)
+ else:
+ reply.append('guid')
+
+ def _get_props(self, doc, request):
+ result = {}
+ metadata = doc.metadata
+ doc.request = request
+ for name in request['reply']:
+ prop = metadata[name]
+ value = prop.on_get(doc, doc.get(name, request.accept_language))
+ if value is None:
+ value = prop.default
+ result[name] = value
+ return result