diff options
Diffstat (limited to 'sugar_network')
-rw-r--r-- | sugar_network/db/directory.py | 40 | ||||
-rw-r--r-- | sugar_network/db/routes.py | 2 | ||||
-rw-r--r-- | sugar_network/node/master.py | 3 | ||||
-rw-r--r-- | sugar_network/node/stats_node.py | 3 | ||||
-rw-r--r-- | sugar_network/node/volume.py | 57 |
5 files changed, 63 insertions, 42 deletions
diff --git a/sugar_network/db/directory.py b/sugar_network/db/directory.py index 60d5c57..50f209e 100644 --- a/sugar_network/db/directory.py +++ b/sugar_network/db/directory.py @@ -35,7 +35,7 @@ _logger = logging.getLogger('db.directory') class Directory(object): - def __init__(self, root, document_class, index_class, + def __init__(self, root, resource_class, index_class, broadcast=None, seqno=None): """ :param index_class: @@ -47,14 +47,14 @@ class Directory(object): if not exists(root): os.makedirs(root) - if document_class.metadata is None: + if resource_class.metadata is None: # Metadata cannot be recreated - document_class.metadata = Metadata(document_class) - document_class.metadata['guid'] = IndexedProperty('guid', + resource_class.metadata = Metadata(resource_class) + resource_class.metadata['guid'] = IndexedProperty('guid', slot=0, prefix=GUID_PREFIX, acl=ACL.CREATE | ACL.READ) - self.metadata = document_class.metadata + self.metadata = resource_class.metadata - self.document_class = document_class + self.resource_class = resource_class self.broadcast = broadcast or (lambda event: None) self._index_class = index_class self._root = root @@ -119,7 +119,7 @@ class Directory(object): # XXX Setters are being proccessed on routes level, but, # while creating resources gotten from routes, it is important # to call setters as well, e.g., `author` property - doc = self.document_class(guid, None, props) + doc = self.resource_class(guid, None, props) for key, value in props.items(): prop = self.metadata.get(key) if prop is not None and prop.on_set is not None: @@ -168,7 +168,7 @@ class Directory(object): enforce(cached_props or record.exists, http.NotFound, 'Resource %r does not exist in %r', guid, self.metadata.name) - return self.document_class(guid, record, cached_props) + return self.resource_class(guid, record, cached_props) def find(self, **kwargs): mset = self._index.find(**kwargs) @@ -177,7 +177,7 @@ class Directory(object): for hit in mset: guid = hit.document.get_value(0) record = self._storage.get(guid) - yield self.document_class(guid, record) + yield self.resource_class(guid, record) return iterate(), mset.get_matches_estimated() @@ -285,11 +285,12 @@ class Directory(object): yield doc.guid, patch() - def merge(self, guid, diff, shift_seqno=True, **kwargs): + def merge(self, guid, diff, shift_seqno=True, op=None, **kwargs): """Apply changes for documents.""" record = self._storage.get(guid) seqno = None - merged = False + merge = {} + patch = {} for prop, meta in diff.items(): orig_meta = record.get(prop) @@ -302,11 +303,18 @@ class Directory(object): else: meta['seqno'] = (orig_meta or {}).get('seqno') or 0 meta.update(kwargs) - record.set(prop, **meta) + merge[prop] = meta + patch[prop] = meta.get('value') - merged = True + if not merge: + return seqno, False + + if op is not None: + op(patch) + for prop, meta in merge.items(): + record.set(prop, **meta) - if merged and record.consistent: + if record.consistent: props = {} if seqno: props['seqno'] = seqno @@ -314,7 +322,7 @@ class Directory(object): # is enough to avoid events flow on nodes synchronization self._index.store(guid, props, self._pre_store, self._post_store) - return seqno, merged + return seqno, True def _open(self): if not exists(self._root): @@ -329,7 +337,7 @@ class Directory(object): self._storage = Storage(self._root, self.metadata) self._index = self._index_class(index_path, self.metadata, self._post_commit) - _logger.debug('Open %r resource', self.document_class) + _logger.debug('Open %r resource', self.resource_class) def _pre_store(self, guid, changes, event=None): seqno = changes.get('seqno') diff --git a/sugar_network/db/routes.py b/sugar_network/db/routes.py index 79682d1..bff9fc5 100644 --- a/sugar_network/db/routes.py +++ b/sugar_network/db/routes.py @@ -166,7 +166,7 @@ class Routes(object): if request.guid: doc = directory.get(request.guid) else: - doc = directory.document_class(None, {}) + doc = directory.resource_class(None, {}) doc.request = request blobs = [] diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py index a121a4b..9903fd8 100644 --- a/sugar_network/node/master.py +++ b/sugar_network/node/master.py @@ -172,7 +172,8 @@ class MasterRoutes(NodeRoutes): if self._files is not None: cookie['files_pull'].include(packet['sequence']) elif packet.name == 'diff': - seq, ack_seq = volume.merge(self.volume, packet) + seq, ack_seq = volume.merge(self.volume, packet, + stats=self._stats) reply.append(('ack', { 'ack': ack_seq, 'sequence': seq, diff --git a/sugar_network/node/stats_node.py b/sugar_network/node/stats_node.py index 2ca8ee6..ec54fc3 100644 --- a/sugar_network/node/stats_node.py +++ b/sugar_network/node/stats_node.py @@ -282,6 +282,9 @@ class _FeedbackStats(_ResourceStats): self.solutions += 1 else: self.solutions -= 1 + elif request.method == 'DELETE': + if self._directory.get(request.guid)['solution']: + self.solutions -= 1 def commit(self): result = _ResourceStats.commit(self) diff --git a/sugar_network/node/volume.py b/sugar_network/node/volume.py index dcc0ab7..00ff983 100644 --- a/sugar_network/node/volume.py +++ b/sugar_network/node/volume.py @@ -37,14 +37,14 @@ def diff(volume, in_seq, out_seq=None, exclude_seq=None, layer=None, layer = [layer] layer.append('common') try: - for document, directory in volume.items(): - if ignore_documents and document in ignore_documents: + for resource, directory in volume.items(): + if ignore_documents and resource in ignore_documents: continue coroutine.dispatch() directory.commit() - yield {'resource': document} + yield {'resource': resource} for guid, patch in directory.diff(in_seq, exclude_seq, - layer=layer if document in _LIMITED_RESOURCES else None): + layer=layer if resource in _LIMITED_RESOURCES else None): adiff = {} adiff_seq = toolkit.Sequence() for prop, meta, seqno in patch: @@ -64,7 +64,7 @@ def diff(volume, in_seq, out_seq=None, exclude_seq=None, layer=None, headers={'Accept-Encoding': ''}) except Exception: _logger.exception('Cannot fetch %r for %s:%s:%s', - url, document, guid, prop) + url, resource, guid, prop) is_the_only_seq = False continue yield {'guid': guid, @@ -88,38 +88,47 @@ def diff(volume, in_seq, out_seq=None, exclude_seq=None, layer=None, yield {'commit': out_seq} -def merge(volume, records, shift_seqno=True, node_stats=None): - document = None +def merge(volume, records, shift_seqno=True, stats=None): + resource = None directory = None commit_seq = toolkit.Sequence() merged_seq = toolkit.Sequence() synced = False for record in records: - document_ = record.get('resource') - if document_: - document = document_ - directory = volume[document_] + resource_ = record.get('resource') + if resource_: + resource = resource_ + directory = volume[resource_] continue if 'guid' in record: - enforce(document, 'Invalid merge, no document') - + guid = record['guid'] + layer = [] + existed = directory.exists(guid) + if existed: + layer = directory.get(guid)['layer'] + + def update_stats(upd): + method = 'PUT' if existed else 'POST' + if ('deleted' in layer) != ('deleted' in upd.get('layer', [])): + if 'deleted' in layer: + # TODO + enforce(not 'supported yet') + else: + method = 'DELETE' + stats.log(Request( + method=method, + path=[resource, guid], + content=upd, + )) + + if stats is not None: + record['op'] = update_stats seqno, merged = directory.merge(shift_seqno=shift_seqno, **record) synced = synced or merged if seqno is not None: merged_seq.include(seqno, seqno) - - if node_stats is not None and document == 'review': - request = Request(method='POST', path=[document]) - patch = record['diff'] - request.content = { - 'context': patch['context']['value'], - 'rating': patch['rating']['value'], - } - if 'artifact' in patch: - request.content['artifact'] = patch['artifact']['value'] - node_stats.log(request) continue commit = record.get('commit') |