From 0e1fdae2ede40b64027f8c68bd6a562e3b217f67 Mon Sep 17 00:00:00 2001 From: Aleksey Lim Date: Sun, 10 Nov 2013 21:49:23 +0000 Subject: Update node stats while mergin subnode data on master --- diff --git a/sugar_network/db/directory.py b/sugar_network/db/directory.py index 60d5c57..50f209e 100644 --- a/sugar_network/db/directory.py +++ b/sugar_network/db/directory.py @@ -35,7 +35,7 @@ _logger = logging.getLogger('db.directory') class Directory(object): - def __init__(self, root, document_class, index_class, + def __init__(self, root, resource_class, index_class, broadcast=None, seqno=None): """ :param index_class: @@ -47,14 +47,14 @@ class Directory(object): if not exists(root): os.makedirs(root) - if document_class.metadata is None: + if resource_class.metadata is None: # Metadata cannot be recreated - document_class.metadata = Metadata(document_class) - document_class.metadata['guid'] = IndexedProperty('guid', + resource_class.metadata = Metadata(resource_class) + resource_class.metadata['guid'] = IndexedProperty('guid', slot=0, prefix=GUID_PREFIX, acl=ACL.CREATE | ACL.READ) - self.metadata = document_class.metadata + self.metadata = resource_class.metadata - self.document_class = document_class + self.resource_class = resource_class self.broadcast = broadcast or (lambda event: None) self._index_class = index_class self._root = root @@ -119,7 +119,7 @@ class Directory(object): # XXX Setters are being proccessed on routes level, but, # while creating resources gotten from routes, it is important # to call setters as well, e.g., `author` property - doc = self.document_class(guid, None, props) + doc = self.resource_class(guid, None, props) for key, value in props.items(): prop = self.metadata.get(key) if prop is not None and prop.on_set is not None: @@ -168,7 +168,7 @@ class Directory(object): enforce(cached_props or record.exists, http.NotFound, 'Resource %r does not exist in %r', guid, self.metadata.name) - return self.document_class(guid, record, cached_props) + return self.resource_class(guid, record, cached_props) def find(self, **kwargs): mset = self._index.find(**kwargs) @@ -177,7 +177,7 @@ class Directory(object): for hit in mset: guid = hit.document.get_value(0) record = self._storage.get(guid) - yield self.document_class(guid, record) + yield self.resource_class(guid, record) return iterate(), mset.get_matches_estimated() @@ -285,11 +285,12 @@ class Directory(object): yield doc.guid, patch() - def merge(self, guid, diff, shift_seqno=True, **kwargs): + def merge(self, guid, diff, shift_seqno=True, op=None, **kwargs): """Apply changes for documents.""" record = self._storage.get(guid) seqno = None - merged = False + merge = {} + patch = {} for prop, meta in diff.items(): orig_meta = record.get(prop) @@ -302,11 +303,18 @@ class Directory(object): else: meta['seqno'] = (orig_meta or {}).get('seqno') or 0 meta.update(kwargs) - record.set(prop, **meta) + merge[prop] = meta + patch[prop] = meta.get('value') - merged = True + if not merge: + return seqno, False + + if op is not None: + op(patch) + for prop, meta in merge.items(): + record.set(prop, **meta) - if merged and record.consistent: + if record.consistent: props = {} if seqno: props['seqno'] = seqno @@ -314,7 +322,7 @@ class Directory(object): # is enough to avoid events flow on nodes synchronization self._index.store(guid, props, self._pre_store, self._post_store) - return seqno, merged + return seqno, True def _open(self): if not exists(self._root): @@ -329,7 +337,7 @@ class Directory(object): self._storage = Storage(self._root, self.metadata) self._index = self._index_class(index_path, self.metadata, self._post_commit) - _logger.debug('Open %r resource', self.document_class) + _logger.debug('Open %r resource', self.resource_class) def _pre_store(self, guid, changes, event=None): seqno = changes.get('seqno') diff --git a/sugar_network/db/routes.py b/sugar_network/db/routes.py index 79682d1..bff9fc5 100644 --- a/sugar_network/db/routes.py +++ b/sugar_network/db/routes.py @@ -166,7 +166,7 @@ class Routes(object): if request.guid: doc = directory.get(request.guid) else: - doc = directory.document_class(None, {}) + doc = directory.resource_class(None, {}) doc.request = request blobs = [] diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py index a121a4b..9903fd8 100644 --- a/sugar_network/node/master.py +++ b/sugar_network/node/master.py @@ -172,7 +172,8 @@ class MasterRoutes(NodeRoutes): if self._files is not None: cookie['files_pull'].include(packet['sequence']) elif packet.name == 'diff': - seq, ack_seq = volume.merge(self.volume, packet) + seq, ack_seq = volume.merge(self.volume, packet, + stats=self._stats) reply.append(('ack', { 'ack': ack_seq, 'sequence': seq, diff --git a/sugar_network/node/stats_node.py b/sugar_network/node/stats_node.py index 2ca8ee6..ec54fc3 100644 --- a/sugar_network/node/stats_node.py +++ b/sugar_network/node/stats_node.py @@ -282,6 +282,9 @@ class _FeedbackStats(_ResourceStats): self.solutions += 1 else: self.solutions -= 1 + elif request.method == 'DELETE': + if self._directory.get(request.guid)['solution']: + self.solutions -= 1 def commit(self): result = _ResourceStats.commit(self) diff --git a/sugar_network/node/volume.py b/sugar_network/node/volume.py index dcc0ab7..00ff983 100644 --- a/sugar_network/node/volume.py +++ b/sugar_network/node/volume.py @@ -37,14 +37,14 @@ def diff(volume, in_seq, out_seq=None, exclude_seq=None, layer=None, layer = [layer] layer.append('common') try: - for document, directory in volume.items(): - if ignore_documents and document in ignore_documents: + for resource, directory in volume.items(): + if ignore_documents and resource in ignore_documents: continue coroutine.dispatch() directory.commit() - yield {'resource': document} + yield {'resource': resource} for guid, patch in directory.diff(in_seq, exclude_seq, - layer=layer if document in _LIMITED_RESOURCES else None): + layer=layer if resource in _LIMITED_RESOURCES else None): adiff = {} adiff_seq = toolkit.Sequence() for prop, meta, seqno in patch: @@ -64,7 +64,7 @@ def diff(volume, in_seq, out_seq=None, exclude_seq=None, layer=None, headers={'Accept-Encoding': ''}) except Exception: _logger.exception('Cannot fetch %r for %s:%s:%s', - url, document, guid, prop) + url, resource, guid, prop) is_the_only_seq = False continue yield {'guid': guid, @@ -88,38 +88,47 @@ def diff(volume, in_seq, out_seq=None, exclude_seq=None, layer=None, yield {'commit': out_seq} -def merge(volume, records, shift_seqno=True, node_stats=None): - document = None +def merge(volume, records, shift_seqno=True, stats=None): + resource = None directory = None commit_seq = toolkit.Sequence() merged_seq = toolkit.Sequence() synced = False for record in records: - document_ = record.get('resource') - if document_: - document = document_ - directory = volume[document_] + resource_ = record.get('resource') + if resource_: + resource = resource_ + directory = volume[resource_] continue if 'guid' in record: - enforce(document, 'Invalid merge, no document') - + guid = record['guid'] + layer = [] + existed = directory.exists(guid) + if existed: + layer = directory.get(guid)['layer'] + + def update_stats(upd): + method = 'PUT' if existed else 'POST' + if ('deleted' in layer) != ('deleted' in upd.get('layer', [])): + if 'deleted' in layer: + # TODO + enforce(not 'supported yet') + else: + method = 'DELETE' + stats.log(Request( + method=method, + path=[resource, guid], + content=upd, + )) + + if stats is not None: + record['op'] = update_stats seqno, merged = directory.merge(shift_seqno=shift_seqno, **record) synced = synced or merged if seqno is not None: merged_seq.include(seqno, seqno) - - if node_stats is not None and document == 'review': - request = Request(method='POST', path=[document]) - patch = record['diff'] - request.content = { - 'context': patch['context']['value'], - 'rating': patch['rating']['value'], - } - if 'artifact' in patch: - request.content['artifact'] = patch['artifact']['value'] - node_stats.log(request) continue commit = record.get('commit') diff --git a/tests/units/node/volume.py b/tests/units/node/volume.py index 77bf5ba..8bd10df 100755 --- a/tests/units/node/volume.py +++ b/tests/units/node/volume.py @@ -13,9 +13,13 @@ from sugar_network import db, toolkit, model from sugar_network.node.volume import diff, merge from sugar_network.node.stats_node import stats_node_step, Sniffer from sugar_network.node.routes import NodeRoutes +from sugar_network.toolkit.rrd import Rrd from sugar_network.toolkit.router import Router, Request, Response, fallbackroute, Blob, ACL, route +current_time = time.time + + class VolumeTest(tests.Test): def setUp(self): @@ -344,59 +348,240 @@ class VolumeTest(tests.Test): self.assertEqual(([[1, 3]], [[101, 101]]), merge(volume, records)) assert volume['document'].exists('1') - def test_merge_UpdateReviewStats(self): + def test_merge_UpdateStats(self): stats_node_step.value = 1 volume = db.Volume('db', model.RESOURCES) cp = NodeRoutes('guid', volume) stats = Sniffer(volume) - context = call(cp, method='POST', document='context', content={ - 'guid': 'context', - 'type': 'package', - 'title': 'title', - 'summary': 'summary', - 'description': 'description', - }) - artifact = call(cp, method='POST', document='artifact', content={ - 'guid': 'artifact', - 'type': 'instance', - 'context': context, - 'title': '', - 'description': '', - }) - records = [ + {'resource': 'context'}, + {'guid': 'context', 'diff': { + 'guid': {'value': 'context', 'mtime': 1.0}, + 'ctime': {'value': 1, 'mtime': 1.0}, + 'mtime': {'value': 1, 'mtime': 1.0}, + 'type': {'value': ['package'], 'mtime': 1.0}, + 'title': {'value': {}, 'mtime': 1.0}, + 'summary': {'value': {}, 'mtime': 1.0}, + 'description': {'value': {}, 'mtime': 1.0}, + }}, + {'resource': 'artifact'}, + {'guid': 'artifact', 'diff': { + 'guid': {'value': 'artifact', 'mtime': 1.0}, + 'ctime': {'value': 1, 'mtime': 1.0}, + 'mtime': {'value': 1, 'mtime': 1.0}, + 'type': {'value': ['instance'], 'mtime': 1.0}, + 'context': {'value': 'context', 'mtime': 1.0}, + 'title': {'value': {}, 'mtime': 1.0}, + 'description': {'value': {}, 'mtime': 1.0}, + }}, {'resource': 'review'}, - {'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': 1.0}, + {'guid': 'context_review', 'diff': { + 'guid': {'value': 'context_review', 'mtime': 1.0}, 'ctime': {'value': 1, 'mtime': 1.0}, 'mtime': {'value': 1, 'mtime': 1.0}, - 'context': {'value': context, 'mtime': 1.0}, - 'artifact': {'value': artifact, 'mtime': 4.0}, + 'context': {'value': 'context', 'mtime': 1.0}, + 'artifact': {'value': 'artifact', 'mtime': 4.0}, 'rating': {'value': 1, 'mtime': 1.0}, 'author': {'mtime': 1, 'value': {}}, 'layer': {'mtime': 1, 'value': []}, 'tags': {'mtime': 1, 'value': []}, }}, - {'guid': '2', 'diff': { - 'guid': {'value': '2', 'mtime': 2.0}, - 'ctime': {'value': 2, 'mtime': 2.0}, - 'mtime': {'value': 2, 'mtime': 2.0}, - 'context': {'value': context, 'mtime': 2.0}, - 'rating': {'value': 2, 'mtime': 2.0}, - 'author': {'mtime': 2, 'value': {}}, - 'layer': {'mtime': 2, 'value': []}, - 'tags': {'mtime': 2, 'value': []}, + {'guid': 'artifact_review', 'diff': { + 'guid': {'value': 'artifact_review', 'mtime': 1.0}, + 'ctime': {'value': 1, 'mtime': 1.0}, + 'mtime': {'value': 1, 'mtime': 1.0}, + 'context': {'value': 'context', 'mtime': 1.0}, + 'rating': {'value': 1, 'mtime': 1.0}, + 'author': {'mtime': 1, 'value': {}}, + 'layer': {'mtime': 1, 'value': []}, + 'tags': {'mtime': 1, 'value': []}, }}, - {'commit': [[1, 2]]}, + {'resource': 'feedback'}, + {'guid': 'feedback_1', 'diff': { + 'guid': {'value': 'feedback_1', 'mtime': 1.0}, + 'ctime': {'value': 1, 'mtime': 1.0}, + 'mtime': {'value': 1, 'mtime': 1.0}, + 'type': {'value': ['idea'], 'mtime': 1.0}, + 'context': {'value': 'context', 'mtime': 1.0}, + 'title': {'value': {}, 'mtime': 1.0}, + 'content': {'value': {}, 'mtime': 1.0}, + 'solution': {'value': 'solution_1', 'mtime': 1.0}, + }}, + {'guid': 'feedback_2', 'diff': { + 'guid': {'value': 'feedback_2', 'mtime': 1.0}, + 'ctime': {'value': 1, 'mtime': 1.0}, + 'mtime': {'value': 1, 'mtime': 1.0}, + 'type': {'value': ['idea'], 'mtime': 1.0}, + 'context': {'value': 'context', 'mtime': 1.0}, + 'title': {'value': {}, 'mtime': 1.0}, + 'content': {'value': {}, 'mtime': 1.0}, + 'solution': {'value': 'solution_2', 'mtime': 1.0}, + }}, + {'resource': 'solution'}, + {'guid': 'solution_1', 'diff': { + 'guid': {'value': 'solution_1', 'mtime': 1.0}, + 'ctime': {'value': 1, 'mtime': 1.0}, + 'mtime': {'value': 1, 'mtime': 1.0}, + 'context': {'value': 'context', 'mtime': 1.0}, + 'feedback': {'value': 'feedback_1', 'mtime': 1.0}, + 'content': {'value': {}, 'mtime': 1.0}, + }}, + {'guid': 'solution_2', 'diff': { + 'guid': {'value': 'solution_2', 'mtime': 1.0}, + 'ctime': {'value': 1, 'mtime': 1.0}, + 'mtime': {'value': 1, 'mtime': 1.0}, + 'context': {'value': 'context', 'mtime': 1.0}, + 'feedback': {'value': 'feedback_1', 'mtime': 1.0}, + 'content': {'value': {}, 'mtime': 1.0}, + }}, + {'resource': 'implementation'}, + {'guid': 'implementation', 'diff': { + 'guid': {'value': 'implementation', 'mtime': 1.0}, + 'ctime': {'value': 1, 'mtime': 1.0}, + 'mtime': {'value': 1, 'mtime': 1.0}, + 'context': {'value': 'context', 'mtime': 1.0}, + 'license': {'value': ['GPL-3.0'], 'mtime': 1.0}, + 'version': {'value': '1', 'mtime': 1.0}, + 'stability': {'value': 'stable', 'mtime': 1.0}, + 'notes': {'value': {}, 'mtime': 1.0}, + }}, + {'commit': [[1, 1]]}, ] - merge(volume, records, node_stats=stats) + merge(volume, records, stats=stats) + ts = int(current_time()) + stats.commit(ts) + + self.assertEqual([ + [('feedback', ts, { + 'solutions': 2.0, + 'total': 2.0, + 'commented': 0.0, + })], + [('review', ts, { + 'total': 2.0, + 'commented': 0.0, + })], + [('solution', ts, { + 'total': 2.0, + 'commented': 0.0, + })], + [('artifact', ts, { + 'reviewed': 1.0, + 'downloaded': 0.0, + 'total': 1.0, + })], + [('user', ts, { + 'total': 0.0, + })], + [('context', ts, { + 'failed': 0.0, + 'reviewed': 1.0, + 'downloaded': 0.0, + 'total': 1.0, + 'released': 1.0, + })], + ], + [[(j.name,) + i for i in j.get(j.last, j.last)] for j in Rrd('stats/node', 1)]) + self.assertEqual(1, volume['artifact'].get('artifact')['rating']) + self.assertEqual([1, 1], volume['artifact'].get('artifact')['reviews']) + self.assertEqual(1, volume['context'].get('context')['rating']) + self.assertEqual([1, 1], volume['context'].get('context')['reviews']) - stats.commit() - self.assertEqual(1, volume['artifact'].get(artifact)['rating']) - self.assertEqual([1, 1], volume['artifact'].get(artifact)['reviews']) - self.assertEqual(2, volume['context'].get(context)['rating']) - self.assertEqual([1, 2], volume['context'].get(context)['reviews']) + records = [ + {'resource': 'feedback'}, + {'guid': 'feedback_2', 'diff': {'solution': {'value': '', 'mtime': 2.0}}}, + {'commit': [[2, 2]]}, + ] + merge(volume, records, stats=stats) + ts += 1 + stats.commit(ts) + + self.assertEqual([ + [('feedback', ts, { + 'solutions': 1.0, + 'total': 2.0, + 'commented': 0.0, + })], + [('review', ts, { + 'total': 2.0, + 'commented': 0.0, + })], + [('solution', ts, { + 'total': 2.0, + 'commented': 0.0, + })], + [('artifact', ts, { + 'reviewed': 0.0, + 'downloaded': 0.0, + 'total': 1.0, + })], + [('user', ts, { + 'total': 0.0, + })], + [('context', ts, { + 'failed': 0.0, + 'reviewed': 0.0, + 'downloaded': 0.0, + 'total': 1.0, + 'released': 0.0, + })], + ], + [[(j.name,) + i for i in j.get(j.last, j.last)] for j in Rrd('stats/node', 1)]) + + records = [ + {'resource': 'context'}, + {'guid': 'context', 'diff': {'layer': {'value': ['deleted'], 'mtime': 3.0}}}, + {'resource': 'artifact'}, + {'guid': 'artifact', 'diff': {'layer': {'value': ['deleted'], 'mtime': 3.0}}}, + {'resource': 'review'}, + {'guid': 'context_review', 'diff': {'layer': {'value': ['deleted'], 'mtime': 3.0}}}, + {'guid': 'artifact_review', 'diff': {'layer': {'value': ['deleted'], 'mtime': 3.0}}}, + {'resource': 'feedback'}, + {'guid': 'feedback_1', 'diff': {'layer': {'value': ['deleted'], 'mtime': 3.0}}}, + {'guid': 'feedback_2', 'diff': {'layer': {'value': ['deleted'], 'mtime': 3.0}}}, + {'resource': 'solution'}, + {'guid': 'solution_1', 'diff': {'layer': {'value': ['deleted'], 'mtime': 3.0}}}, + {'guid': 'solution_2', 'diff': {'layer': {'value': ['deleted'], 'mtime': 3.0}}}, + {'resource': 'implementation'}, + {'guid': 'implementation', 'diff': {'layer': {'value': ['deleted'], 'mtime': 3.0}}}, + {'commit': [[3, 3]]}, + ] + merge(volume, records, stats=stats) + ts += 1 + stats.commit(ts) + + self.assertEqual([ + [('feedback', ts, { + 'solutions': 0.0, + 'total': 0.0, + 'commented': 0.0, + })], + [('review', ts, { + 'total': 0.0, + 'commented': 0.0, + })], + [('solution', ts, { + 'total': 0.0, + 'commented': 0.0, + })], + [('artifact', ts, { + 'reviewed': 0.0, + 'downloaded': 0.0, + 'total': 0.0, + })], + [('user', ts, { + 'total': 0.0, + })], + [('context', ts, { + 'failed': 0.0, + 'reviewed': 0.0, + 'downloaded': 0.0, + 'total': 0.0, + 'released': 0.0, + })], + ], + [[(j.name,) + i for i in j.get(j.last, j.last)] for j in Rrd('stats/node', 1)]) def test_diff_Blobs(self): -- cgit v0.9.1