From 6b1d747f40416c8a70b70953c7c0349e08d46507 Mon Sep 17 00:00:00 2001 From: Aleksey Lim Date: Fri, 22 Nov 2013 15:22:07 +0000 Subject: Avoid loosing node stats on restarts --- diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py index f7fb9f4..dece569 100644 --- a/sugar_network/node/routes.py +++ b/sugar_network/node/routes.py @@ -61,8 +61,7 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes): def close(self): if self._stats is not None: - self._stats.commit() - self._stats.commit_objects() + self._stats.suspend() @property def guid(self): diff --git a/sugar_network/node/stats_node.py b/sugar_network/node/stats_node.py index fb9ef5d..53f4676 100644 --- a/sugar_network/node/stats_node.py +++ b/sugar_network/node/stats_node.py @@ -13,7 +13,11 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import os +import time +import json import logging +from os.path import exists, join from sugar_network.toolkit.rrd import Rrd from sugar_network.toolkit import Option @@ -49,27 +53,47 @@ class Sniffer(object): self._volume = volume self._rrd = Rrd(path, stats_node_step.value, stats_node_rras.value) self._stats = {} + self._suspend_path = join(path, '.suspend') + self._last = int(time.time()) for name, cls in _STATS.items(): stats = self._stats[name] = cls(self._stats, volume, reset) fields = {} - for attr in dir(stats): - if attr[0] == '_' or attr[0].isupper() or \ - type(getattr(stats, attr)) not in (int, long): - continue - if attr == 'total': + for field in stats: + if field == 'total': dst = 'GAUGE' limit = 60 * 60 * 24 * 365 else: dst = 'ABSOLUTE' limit = stats_node_step.value - fields[attr] = 'DS:%s:%s:%s:U:U' % (attr, dst, limit) + fields[field] = 'DS:%s:%s:%s:U:U' % (field, dst, limit) if fields: self._rrd[name].fields = fields + if exists(self._suspend_path): + with file(self._suspend_path) as f: + suspend = json.load(f) + for name, stats in self._stats.items(): + if name not in suspend['state']: + continue + total_stats, stats.objects = suspend['state'][name] + stats.update(total_stats) + if suspend['timestamp'] < int(time.time()): + self.commit(suspend['timestamp']) + self.commit_objects() + os.unlink(self._suspend_path) + def __getitem__(self, name): return self._rrd[name] + def suspend(self): + state = dict([(i, (j, j.objects)) for i, j in self._stats.items()]) + with file(self._suspend_path, 'w') as f: + json.dump({ + 'timestamp': self._last + stats_node_step.value, + 'state': state, + }, f) + def log(self, request): if request.cmd or request.resource not in _STATS: return @@ -81,45 +105,44 @@ class Sniffer(object): for resource, stats in self._stats.items(): if resource not in self._rrd: continue - values = {} - for field in self._rrd[resource].fields: - values[field] = getattr(stats, field) + values = stats.copy() + for field in stats: if field != 'total': - setattr(stats, field, 0) + stats[field] = 0 if extra_values and resource in extra_values: values.update(extra_values[resource]) if values: self._rrd[resource].put(values, timestamp=timestamp) + self._last = timestamp or int(time.time()) + def commit_objects(self, reset=False): _logger.trace('Commit object stats') for resource, stats in self._stats.items(): - obj = { + old = { 'downloads': 0, 'reviews': (0, 0), } directory = self._volume[resource] - for guid, obj_stats in stats.active.items(): - if not obj_stats.reviews and not obj_stats.downloads: - continue + for guid, new in stats.objects.items(): if not directory.exists(guid): _logger.warning('Ignore stats for missed %r %s', guid, resource) continue if not reset: - obj = directory.get(guid) + old = directory.get(guid) patch = {} - if obj_stats.downloads: - patch['downloads'] = obj_stats.downloads + obj['downloads'] - if obj_stats.reviews: - reviews, rating = obj['reviews'] - reviews += obj_stats.reviews - rating += obj_stats.rating + if 'downloads' in new: + patch['downloads'] = new['downloads'] + old['downloads'] + if 'reviews' in new: + reviews, rating = old['reviews'] + reviews += new['reviews'] + rating += new['rating'] patch['reviews'] = [reviews, rating] patch['rating'] = int(round(float(rating) / reviews)) directory.update(guid, patch) - stats.active.clear() + stats.objects.clear() def report(self, dbs, start, end, records): result = {} @@ -148,28 +171,22 @@ class Sniffer(object): return result -class _ObjectStats(object): - - downloads = 0 - reviews = 0 - rating = 0 - - -class _Stats(object): +class _Stats(dict): RESOURCE = None OWNERS = [] def __init__(self, stats, volume, reset): - self.active = {} + self.objects = {} self._stats = stats self._volume = volume - def __getitem__(self, guid): - result = self.active.get(guid) - if result is None: - result = self.active[guid] = _ObjectStats() - return result + def inc(self, guid, prop, value=1): + obj = self.objects.setdefault(guid, {}) + if prop not in obj: + obj[prop] = value + else: + obj[prop] += value def log(self, request): pass @@ -177,18 +194,18 @@ class _Stats(object): class _ResourceStats(_Stats): - total = 0 - def __init__(self, stats, volume, reset): _Stats.__init__(self, stats, volume, reset) - if not reset: - self.total = volume[self.RESOURCE].find(limit=0)[1] + if reset: + self['total'] = 0 + else: + self['total'] = volume[self.RESOURCE].find(limit=0)[1] def log(self, request): if request.method == 'POST': - self.total += 1 + self['total'] += 1 elif request.method == 'DELETE': - self.total -= 1 + self['total'] -= 1 def parse_context(self, request): context = None @@ -233,9 +250,11 @@ class _ContextStats(_ResourceStats): RESOURCE = 'context' - released = 0 - failed = 0 - downloaded = 0 + def __init__(self, stats, volume, reset): + _ResourceStats.__init__(self, stats, volume, reset) + self['released'] = 0 + self['failed'] = 0 + self['downloaded'] = 0 class _ImplementationStats(_Stats): @@ -247,10 +266,10 @@ class _ImplementationStats(_Stats): if request.method == 'GET': if request.prop == 'data': context = self._volume[self.RESOURCE].get(request.guid) - self._stats['context'][context.context].downloads += 1 - self._stats['context'].downloaded += 1 + self._stats['context'].inc(context.context, 'downloads') + self._stats['context']['downloaded'] += 1 elif request.method == 'POST': - self._stats['context'].released += 1 + self._stats['context']['released'] += 1 class _ReportStats(_Stats): @@ -260,7 +279,7 @@ class _ReportStats(_Stats): def log(self, request): if request.method == 'POST': - self._stats['context'].failed += 1 + self._stats['context']['failed'] += 1 class _ReviewStats(_ResourceStats): @@ -273,12 +292,13 @@ class _ReviewStats(_ResourceStats): if request.method == 'POST': if request.content.get('artifact'): - artifact = self._stats['artifact'] - stats = artifact[request.content['artifact']] + stats = self._stats['artifact'] + guid = request.content['artifact'] else: - stats = self._stats['context'][self.parse_context(request)] - stats.reviews += 1 - stats.rating += request.content.get('rating') or 0 + stats = self._stats['context'] + guid = self.parse_context(request) + stats.inc(guid, 'reviews') + stats.inc(guid, 'rating', request.content.get('rating') or 0) class _FeedbackStats(_ResourceStats): @@ -298,15 +318,17 @@ class _ArtifactStats(_ResourceStats): RESOURCE = 'artifact' OWNERS = ['context'] - downloaded = 0 + def __init__(self, stats, volume, reset): + _ResourceStats.__init__(self, stats, volume, reset) + self['downloaded'] = 0 def log(self, request): _ResourceStats.log(self, request) if request.method == 'GET': if request.prop == 'data': - self[request.guid].downloads += 1 - self.downloaded += 1 + self.inc(request.guid, 'downloads') + self['downloaded'] += 1 class _CommentStats(_ResourceStats): diff --git a/tests/units/node/stats_node.py b/tests/units/node/stats_node.py index da2f3e7..aec9102 100755 --- a/tests/units/node/stats_node.py +++ b/tests/units/node/stats_node.py @@ -6,7 +6,7 @@ import time from __init__ import tests from sugar_network import db, model -from sugar_network.node.stats_node import Sniffer +from sugar_network.node.stats_node import Sniffer, stats_node_step from sugar_network.toolkit.rrd import Rrd from sugar_network.toolkit.router import Request @@ -17,12 +17,12 @@ class StatsTest(tests.Test): volume = db.Volume('local', model.RESOURCES) stats = Sniffer(volume, 'stats/node') - self.assertEqual(0, stats._stats['user'].total) - self.assertEqual(0, stats._stats['context'].total) - self.assertEqual(0, stats._stats['review'].total) - self.assertEqual(0, stats._stats['feedback'].total) - self.assertEqual(0, stats._stats['solution'].total) - self.assertEqual(0, stats._stats['artifact'].total) + self.assertEqual(0, stats._stats['user']['total']) + self.assertEqual(0, stats._stats['context']['total']) + self.assertEqual(0, stats._stats['review']['total']) + self.assertEqual(0, stats._stats['feedback']['total']) + self.assertEqual(0, stats._stats['solution']['total']) + self.assertEqual(0, stats._stats['artifact']['total']) volume['user'].create({'guid': 'user', 'name': 'user', 'pubkey': ''}) volume['context'].create({'guid': 'context', 'type': 'activity', 'title': '', 'summary': '', 'description': ''}) @@ -33,12 +33,12 @@ class StatsTest(tests.Test): volume['artifact'].create({'guid': 'artifact', 'type': 'instance', 'context': 'context', 'title': '', 'description': ''}) stats = Sniffer(volume, 'stats/node') - self.assertEqual(1, stats._stats['user'].total) - self.assertEqual(1, stats._stats['context'].total) - self.assertEqual(1, stats._stats['review'].total) - self.assertEqual(2, stats._stats['feedback'].total) - self.assertEqual(1, stats._stats['solution'].total) - self.assertEqual(1, stats._stats['artifact'].total) + self.assertEqual(1, stats._stats['user']['total']) + self.assertEqual(1, stats._stats['context']['total']) + self.assertEqual(1, stats._stats['review']['total']) + self.assertEqual(2, stats._stats['feedback']['total']) + self.assertEqual(1, stats._stats['solution']['total']) + self.assertEqual(1, stats._stats['artifact']['total']) def test_POSTs(self): volume = db.Volume('local', model.RESOURCES) @@ -49,7 +49,7 @@ class StatsTest(tests.Test): stats.log(request) stats.log(request) stats.log(request) - self.assertEqual(3, stats._stats['context'].total) + self.assertEqual(3, stats._stats['context']['total']) def test_DELETEs(self): volume = db.Volume('local', model.RESOURCES) @@ -60,7 +60,7 @@ class StatsTest(tests.Test): stats.log(request) stats.log(request) stats.log(request) - self.assertEqual(-3, stats._stats['context'].total) + self.assertEqual(-3, stats._stats['context']['total']) def test_Comments(self): volume = db.Volume('local', model.RESOURCES) @@ -73,19 +73,19 @@ class StatsTest(tests.Test): request.principal = 'user' request.content = {'solution': 'solution'} stats.log(request) - self.assertEqual(1, stats._stats['comment'].total) + self.assertEqual(1, stats._stats['comment']['total']) request = Request(method='POST', path=['comment']) request.principal = 'user' request.content = {'feedback': 'feedback'} stats.log(request) - self.assertEqual(2, stats._stats['comment'].total) + self.assertEqual(2, stats._stats['comment']['total']) request = Request(method='POST', path=['comment']) request.principal = 'user' request.content = {'review': 'review'} stats.log(request) - self.assertEqual(3, stats._stats['comment'].total) + self.assertEqual(3, stats._stats['comment']['total']) def test_Reviewes(self): volume = db.Volume('local', model.RESOURCES) @@ -97,19 +97,19 @@ class StatsTest(tests.Test): request.principal = 'user' request.content = {'context': 'context', 'rating': 1} stats.log(request) - self.assertEqual(1, stats._stats['review'].total) + self.assertEqual(1, stats._stats['review']['total']) request = Request(method='POST', path=['review']) request.principal = 'user' request.content = {'context': 'context', 'artifact': '', 'rating': 2} stats.log(request) - self.assertEqual(2, stats._stats['review'].total) + self.assertEqual(2, stats._stats['review']['total']) request = Request(method='POST', path=['review']) request.principal = 'user' request.content = {'artifact': 'artifact', 'rating': 3} stats.log(request) - self.assertEqual(3, stats._stats['review'].total) + self.assertEqual(3, stats._stats['review']['total']) stats.commit_objects() self.assertEqual([2, 3], volume['context'].get('context')['reviews']) @@ -124,12 +124,12 @@ class StatsTest(tests.Test): request = Request(method='GET', path=['implementation', 'implementation', 'fake']) request.principal = 'user' stats.log(request) - self.assertEqual(0, stats._stats['context'].downloaded) + self.assertEqual(0, stats._stats['context']['downloaded']) request = Request(method='GET', path=['implementation', 'implementation', 'data']) request.principal = 'user' stats.log(request) - self.assertEqual(1, stats._stats['context'].downloaded) + self.assertEqual(1, stats._stats['context']['downloaded']) def test_ContextReleased(self): volume = db.Volume('local', model.RESOURCES) @@ -140,7 +140,7 @@ class StatsTest(tests.Test): request.principal = 'user' request.content = {'context': 'context'} stats.log(request) - self.assertEqual(1, stats._stats['context'].released) + self.assertEqual(1, stats._stats['context']['released']) def test_ContextFailed(self): volume = db.Volume('local', model.RESOURCES) @@ -151,7 +151,7 @@ class StatsTest(tests.Test): request.principal = 'user' request.content = {'context': 'context'} stats.log(request) - self.assertEqual(1, stats._stats['context'].failed) + self.assertEqual(1, stats._stats['context']['failed']) def test_ArtifactDownloaded(self): volume = db.Volume('local', model.RESOURCES) @@ -161,12 +161,12 @@ class StatsTest(tests.Test): request = Request(method='GET', path=['artifact', 'artifact', 'fake']) request.principal = 'user' stats.log(request) - self.assertEqual(0, stats._stats['artifact'].downloaded) + self.assertEqual(0, stats._stats['artifact']['downloaded']) request = Request(method='GET', path=['artifact', 'artifact', 'data']) request.principal = 'user' stats.log(request) - self.assertEqual(1, stats._stats['artifact'].downloaded) + self.assertEqual(1, stats._stats['artifact']['downloaded']) def test_Commit(self): volume = db.Volume('local', model.RESOURCES) @@ -198,25 +198,23 @@ class StatsTest(tests.Test): request.principal = 'user' stats.log(request) - self.assertEqual(1, stats._stats['user'].total) - self.assertEqual(1, stats._stats['context'].total) - self.assertEqual(1, stats._stats['review'].total) - self.assertEqual(1, stats._stats['feedback'].total) - self.assertEqual(1, stats._stats['solution'].total) - self.assertEqual(1, stats._stats['artifact'].total) + self.assertEqual(1, stats._stats['user']['total']) + self.assertEqual(1, stats._stats['context']['total']) + self.assertEqual(1, stats._stats['review']['total']) + self.assertEqual(1, stats._stats['feedback']['total']) + self.assertEqual(1, stats._stats['solution']['total']) + self.assertEqual(1, stats._stats['artifact']['total']) ts = int(time.time()) stats.commit(ts) stats.commit_objects() - self.assertEqual(1, stats._stats['user'].total) - self.assertEqual(1, stats._stats['context'].total) - self.assertEqual(1, stats._stats['review'].total) - self.assertEqual(1, stats._stats['feedback'].total) - self.assertEqual(1, stats._stats['solution'].total) - self.assertEqual(1, stats._stats['artifact'].total) - - print [[(j.name,) + i for i in j.get(j.last, j.last)] for j in Rrd('stats/node', 1)] + self.assertEqual(1, stats._stats['user']['total']) + self.assertEqual(1, stats._stats['context']['total']) + self.assertEqual(1, stats._stats['review']['total']) + self.assertEqual(1, stats._stats['feedback']['total']) + self.assertEqual(1, stats._stats['solution']['total']) + self.assertEqual(1, stats._stats['artifact']['total']) self.assertEqual([ [('comment', ts, { @@ -345,6 +343,74 @@ class StatsTest(tests.Test): self.assertEqual([2, 6], volume['artifact'].get('artifact')['reviews']) self.assertEqual(3, volume['artifact'].get('artifact')['rating']) + def test_Suspend(self): + stats_node_step.value = 5 + volume = db.Volume('local', model.RESOURCES) + volume['context'].create({'guid': 'context', 'type': 'activity', 'title': '', 'summary': '', 'description': ''}) + volume['implementation'].create({'guid': 'impl', 'context': 'context', 'license': 'GPLv3', 'version': '1', 'date': 0, 'stability': 'stable', 'notes': ''}) + + ts = self.ts = 1000000000 + self.override(time, 'time', lambda: self.ts) + + stats = Sniffer(volume, 'stats') + request = Request(method='POST', path=['context']) + stats.log(request) + request = Request(method='GET', path=['implementation', 'impl', 'data'], context='context') + stats.log(request) + stats.suspend() + + rdb = Rrd('stats', 1)['context'] + self.assertEqual([ + ], + [i for i in rdb.get(ts, ts + 10)]) + + stats = Sniffer(volume, 'stats') + stats.suspend() + + rdb = Rrd('stats', 1)['context'] + self.assertEqual([ + ], + [i for i in rdb.get(ts, ts + 10)]) + + self.ts += 6 + stats = Sniffer(volume, 'stats') + + rdb = Rrd('stats', 1)['context'] + self.assertEqual([ + (ts + 0, {'failed': 0.0, 'downloaded': 0.0, 'total': 0.0, 'released': 0.0}), + (ts + 5, {'failed': 0.0, 'downloaded': 0.2, 'total': 2.0, 'released': 0.0}), + ], + [i for i in rdb.get(ts, ts + 20)]) + + request = Request(method='POST', path=['context']) + stats.log(request) + request = Request(method='GET', path=['implementation', 'impl', 'data'], context='context') + stats.log(request) + request = Request(method='GET', path=['implementation', 'impl', 'data'], context='context') + stats.log(request) + stats.suspend() + + stats = Sniffer(volume, 'stats') + stats.suspend() + + rdb = Rrd('stats', 1)['context'] + self.assertEqual([ + (ts + 0, {'failed': 0.0, 'downloaded': 0.0, 'total': 0.0, 'released': 0.0}), + (ts + 5, {'failed': 0.0, 'downloaded': 0.2, 'total': 2.0, 'released': 0.0}), + ], + [i for i in rdb.get(ts, ts + 10)]) + + self.ts += 6 + stats = Sniffer(volume, 'stats') + + rdb = Rrd('stats', 1)['context'] + self.assertEqual([ + (ts + 0, {'failed': 0.0, 'downloaded': 0.0, 'total': 0.0, 'released': 0.0}), + (ts + 5, {'failed': 0.0, 'downloaded': 0.2, 'total': 2.0, 'released': 0.0}), + (ts + 10, {'failed': 0.0, 'downloaded': 0.4, 'total': 3.0, 'released': 0.0}), + ], + [i for i in rdb.get(ts, ts + 20)]) + if __name__ == '__main__': tests.main() -- cgit v0.9.1