Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/document/directory.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/document/directory.py')
-rw-r--r--sugar_network/document/directory.py417
1 files changed, 417 insertions, 0 deletions
diff --git a/sugar_network/document/directory.py b/sugar_network/document/directory.py
new file mode 100644
index 0000000..75044cb
--- /dev/null
+++ b/sugar_network/document/directory.py
@@ -0,0 +1,417 @@
+# Copyright (C) 2011-2012 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import re
+import shutil
+import logging
+from os.path import exists, join
+
+from active_document import env
+from active_document.storage import Storage
+from active_document.metadata import Metadata
+from active_document.metadata import ActiveProperty, StoredProperty
+from active_toolkit import util, enforce
+
+
+# To invalidate existed index on stcuture changes
+_LAYOUT_VERSION = 3
+
+_GUID_RE = re.compile('[a-zA-Z0-9_+-.]+$')
+
+_logger = logging.getLogger('active_document.document')
+
+
+class Directory(object):
+
+ def __init__(self, root, document_class, index_class,
+ notification_cb=None, seqno=None):
+ """
+ :param index_class:
+ what class to use to access to indexes, for regular casses
+ (using `Master` and `Node`, it will be all time ProxyIndex to
+ keep writer in separate process).
+
+ """
+ if not exists(root):
+ os.makedirs(root)
+
+ if document_class.metadata is None:
+ # Metadata cannot be recreated
+ document_class.metadata = Metadata(document_class)
+ document_class.metadata['guid'] = ActiveProperty('guid',
+ permissions=env.ACCESS_CREATE | env.ACCESS_READ, slot=0,
+ prefix=env.GUID_PREFIX)
+ self.metadata = document_class.metadata
+
+ self.document_class = document_class
+ self._root = root
+ self._notification_cb = notification_cb
+ self._seqno = _SessionSeqno() if seqno is None else seqno
+
+ index_path = join(root, 'index')
+ if self._is_layout_stale():
+ if exists(index_path):
+ _logger.warning('%r layout is stale, remove index',
+ self.metadata.name)
+ shutil.rmtree(index_path, ignore_errors=True)
+ self._save_layout()
+
+ self._storage = Storage(root, self.metadata)
+ self._index = index_class(index_path, self.metadata, self._post_commit)
+
+ _logger.debug('Initiated %r document', document_class)
+
+ @property
+ def mtime(self):
+ return self._index.mtime
+
+ @mtime.setter
+ def mtime(self, value):
+ self._index.mtime = value
+ self._notify({'event': 'populate', 'props': {'mtime': value}})
+
+ def close(self):
+ """Flush index write pending queue and close the index."""
+ self._index.close()
+ self._storage = None
+ self._index = None
+
+ def commit(self):
+ """Flush pending chnages to disk."""
+ self._index.commit()
+
+ def create(self, props=None, **kwargs):
+ """Create new document.
+
+ If `guid` property is not specified, it will be auto set.
+
+ :param kwargs:
+ new document properties
+ :returns:
+ GUID of newly created document
+
+ """
+ if props is None:
+ props = kwargs
+
+ if 'guid' in props:
+ guid = props['guid']
+ enforce(_GUID_RE.match(guid) is not None, 'Malformed GUID')
+ enforce(not self.exists(guid), 'Document already exists')
+ else:
+ guid = props['guid'] = env.uuid()
+
+ for prop_name, prop in self.metadata.items():
+ if isinstance(prop, StoredProperty):
+ if prop_name in props:
+ continue
+ enforce(prop.default is not None,
+ 'Property %r should be passed for new %r document',
+ prop_name, self.metadata.name)
+ if prop.default is not None:
+ props[prop_name] = prop.default
+
+ _logger.debug('Create %s[%s]: %r', self.metadata.name, guid, props)
+ self._post(guid, props, True)
+ return guid
+
+ def update(self, guid, props=None, **kwargs):
+ """Update properties for an existing document.
+
+ :param guid:
+ document GUID to store
+ :param kwargs:
+ properties to store, not necessary all document's properties
+
+ """
+ if props is None:
+ props = kwargs
+ if not props:
+ return
+ _logger.debug('Update %s[%s]: %r', self.metadata.name, guid, props)
+ self._post(guid, props, False)
+
+ def delete(self, guid):
+ """Delete document.
+
+ :param guid:
+ document GUID to delete
+
+ """
+ _logger.debug('Delete %s[%s]', self.metadata.name, guid)
+ event = {'event': 'delete', 'guid': guid}
+ self._index.delete(guid, self._post_delete, event)
+
+ def exists(self, guid):
+ return self._storage.get(guid).consistent
+
+ def get(self, guid):
+ cached_props = self._index.get_cached(guid)
+ record = self._storage.get(guid)
+ enforce(cached_props or record.exists, env.NotFound,
+ 'Document %r does not exist in %r',
+ guid, self.metadata.name)
+ return self.document_class(guid, record, cached_props)
+
+ def find(self, *args, **kwargs):
+ """Search documents.
+
+ The result will be an array of dictionaries with found documents'
+ properties. Function accepts the same arguments as
+ `active_document.Query`.
+
+ :returns:
+ a tuple of (`documents`, `total_count`); where the `total_count` is
+ the total number of documents conforming the search parameters,
+ i.e., not only documents that are included to the resulting list
+
+ """
+ mset = self._index.find(env.Query(*args, **kwargs))
+
+ def iterate():
+ for hit in mset:
+ guid = hit.document.get_value(0)
+ record = self._storage.get(guid)
+ yield self.document_class(guid, record)
+
+ return iterate(), mset.get_matches_estimated()
+
+ def set_blob(self, guid, prop, data=None, size=None, mime_type=None,
+ **kwargs):
+ """Receive BLOB property.
+
+ This function works in parallel to setting non-BLOB properties values
+ and `post()` function.
+
+ :param prop:
+ BLOB property name
+ :param data:
+ stream to read BLOB content, path to file to copy, or, web url
+ :param size:
+ read only specified number of bytes; otherwise, read until the EOF
+
+ """
+ prop = self.metadata[prop]
+ record = self._storage.get(guid)
+ seqno = self._seqno.next()
+
+ _logger.debug('Received %r BLOB property from %s[%s]',
+ prop.name, self.metadata.name, guid)
+
+ if not mime_type:
+ mime_type = prop.mime_type
+ record.set_blob(prop.name, data, size, seqno=seqno,
+ mime_type=mime_type, **kwargs)
+
+ if record.consistent:
+ self._post(guid, {'seqno': seqno}, False)
+
+ def populate(self):
+ """Populate the index.
+
+ This function needs be called right after `init()` to pickup possible
+ pending changes made during the previous session when index was not
+ propertly closed.
+
+ :returns:
+ function is a generator that will be iterated after picking up
+ every object to let the caller execute urgent tasks
+
+ """
+ found = False
+ migrate = (self._index.mtime == 0)
+
+ for guid in self._storage.walk(self._index.mtime):
+ if not found:
+ _logger.info('Start populating %r index', self.metadata.name)
+ found = True
+
+ if migrate:
+ self._storage.migrate(guid)
+
+ record = self._storage.get(guid)
+ try:
+ props = {}
+ for name, prop in self.metadata.items():
+ if not isinstance(prop, StoredProperty):
+ continue
+ meta = record.get(name)
+ if meta is not None:
+ props[name] = meta['value']
+ self._index.store(guid, props, None, None, None)
+ yield
+ except Exception:
+ util.exception('Cannot populate %r in %r, invalidate it',
+ guid, self.metadata.name)
+ record.invalidate()
+
+ self._index.checkpoint()
+ if found:
+ self._save_layout()
+ self.commit()
+ self._notify({'event': 'populate'})
+
+ def diff(self, in_seq, out_seq, **kwargs):
+ if 'group_by' in kwargs:
+ # Pickup only most recent change
+ kwargs['order_by'] = '-seqno'
+ else:
+ kwargs['order_by'] = 'seqno'
+ # TODO On big requests, xapian can raise an exception on edits
+ kwargs['limit'] = env.MAX_LIMIT
+ kwargs['no_cache'] = True
+
+ for start, end in in_seq:
+ query = 'seqno:%s..' % start
+ if end:
+ query += str(end)
+ documents, __ = self.find(query=query, **kwargs)
+
+ for doc in documents:
+ diff = {}
+ diff_seq = env.Sequence()
+ for name in self.metadata.keys():
+ if name == 'seqno':
+ continue
+ meta = doc.meta(name)
+ if meta is None:
+ continue
+ seqno = meta.get('seqno')
+ if seqno not in in_seq:
+ continue
+ prop = diff[name] = {'mtime': meta['mtime']}
+ for i in ('value', 'mime_type', 'digest', 'path', 'url'):
+ if i in meta:
+ prop[i] = meta[i]
+ diff_seq.include(seqno, seqno)
+ yield doc.guid, diff
+ out_seq.include(diff_seq)
+
+ def merge(self, guid, diff, increment_seqno=True):
+ """Apply changes for documents."""
+ record = self._storage.get(guid)
+ seqno = None
+ merged = False
+
+ for prop, meta in diff.items():
+ orig_meta = record.get(prop)
+ if orig_meta is not None and orig_meta['mtime'] >= meta['mtime']:
+ continue
+ if increment_seqno:
+ if not seqno:
+ seqno = self._seqno.next()
+ meta['seqno'] = seqno
+ else:
+ meta['seqno'] = (orig_meta or {}).get('seqno') or 0
+ record.set(prop, **meta)
+ merged = True
+
+ if merged and record.consistent:
+ props = {}
+ if seqno:
+ props['seqno'] = seqno
+ self._index.store(guid, props, False,
+ self._pre_store, self._post_store,
+ # No need in after-merge event, further commit event
+ # is enough to avoid events flow on nodes synchronization
+ None, False)
+
+ return seqno
+
+ def _pre_store(self, guid, changes, event, increment_seqno):
+ seqno = changes.get('seqno')
+ if increment_seqno and not seqno:
+ seqno = changes['seqno'] = self._seqno.next()
+
+ record = self._storage.get(guid)
+ existed = record.exists
+
+ for name, prop in self.metadata.items():
+ if not isinstance(prop, StoredProperty):
+ continue
+ value = changes.get(name)
+ if value is None:
+ if existed:
+ meta = record.get(name)
+ if meta is not None:
+ value = meta['value']
+ changes[name] = prop.default if value is None else value
+ else:
+ if prop.localized:
+ if not isinstance(value, dict):
+ value = {env.DEFAULT_LANG: value}
+ if existed:
+ meta = record.get(name)
+ if meta is not None:
+ meta['value'].update(value)
+ value = meta['value']
+ changes[name] = value
+ record.set(name, value=value, seqno=seqno)
+
+ def _post_store(self, guid, changes, event, increment_seqno):
+ if event:
+ self._notify(event)
+
+ def _post_delete(self, guid, event):
+ self._storage.delete(guid)
+ self._notify(event)
+
+ def _post_commit(self):
+ self._seqno.commit()
+ self._notify({'event': 'commit'})
+
+ def _post(self, guid, props, new):
+ event = {'event': 'create' if new else 'update',
+ 'props': props.copy(),
+ 'guid': guid,
+ }
+ self._index.store(guid, props, new,
+ self._pre_store, self._post_store, event, True)
+
+ def _notify(self, event):
+ if self._notification_cb is not None:
+ event['document'] = self.metadata.name
+ self._notification_cb(event)
+
+ def _save_layout(self):
+ path = join(self._root, 'layout')
+ with util.new_file(path) as f:
+ f.write(str(_LAYOUT_VERSION))
+
+ def _is_layout_stale(self):
+ path = join(self._root, 'layout')
+ if not exists(path):
+ return True
+ with file(path) as f:
+ version = f.read()
+ return not version.isdigit() or int(version) != _LAYOUT_VERSION
+
+
+class _SessionSeqno(object):
+
+ def __init__(self):
+ self._value = 0
+
+ @property
+ def value(self):
+ return self._value
+
+ def next(self):
+ self._value += 1
+ return self._value
+
+ def commit(self):
+ pass