From e0008f8b1cd2ccb1e044dfbabfa30c54d07d71fa Mon Sep 17 00:00:00 2001 From: Aleksey Lim Date: Mon, 12 May 2014 15:12:24 +0000 Subject: Revert node statistics --- (limited to 'sugar_network/node/stats.py') diff --git a/sugar_network/node/stats.py b/sugar_network/node/stats.py new file mode 100644 index 0000000..d1b0d63 --- /dev/null +++ b/sugar_network/node/stats.py @@ -0,0 +1,269 @@ +# Copyright (C) 2014 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os +import time +import logging + +from sugar_network.toolkit.rrd import Rrd +from sugar_network.toolkit.router import route, postroute, Request +from sugar_network.toolkit.coroutine import this +from sugar_network.toolkit import Option, coroutine, enforce + + +stats = Option( + 'collect unpersonalized node statistics', + default=False, type_cast=Option.bool_cast, action='store_true') + +stats_step = Option( + 'step interval in seconds for RRD statistics database', + default=60 * 5, type_cast=int) + +stats_rras = Option( + 'comma separated list of RRAs for RRD statistics database', + default=[ + 'RRA:AVERAGE:0.5:1:864', # 3d with 5min step + 'RRA:AVERAGE:0.5:288:3660', # 10y with 1d step + 'RRA:AVERAGE:0.5:2880:366', # 10y with 10d step + 'RRA:AVERAGE:0.5:8640:122', # 10y with 30d step + 'RRA:AVERAGE:0.5:105120:10', # 10y with 1y step + ], + type_cast=Option.list_cast, type_repr=Option.list_repr) + +_HEARTBEAT_EVER = 60 * 60 * 24 * 365 + +_DS = { + 'contexts': { + 'type': 'GAUGE', + 'heartbeat': _HEARTBEAT_EVER, + 'resource': 'context', + 'query': {}, + }, + 'released': { + 'type': 'ABSOLUTE', + }, + 'solved': { + 'type': 'ABSOLUTE', + }, + 'reported': { + 'type': 'ABSOLUTE', + }, + 'topics': { + 'type': 'GAUGE', + 'heartbeat': _HEARTBEAT_EVER, + 'resource': 'user', + 'query': {'topic': ''}, + }, + 'posts': { + 'type': 'GAUGE', + 'heartbeat': _HEARTBEAT_EVER, + 'resource': 'user', + 'query': {'not_topic': ''}, + }, + 'users': { + 'type': 'GAUGE', + 'heartbeat': _HEARTBEAT_EVER, + 'resource': 'user', + 'query': {}, + }, + } + +_ROUTES = { + ('POST', 'context', None, None): + ('contexts', +1), + ('DELETE', 'context', None, None): + ('contexts', -1), + ('POST', 'context', 'releases', None): + ('released', +1), + ('GET', 'context', None, 'solve'): + ('solved', +1), + ('POST', 'report', None, None): + ('reported', +1), + ('POST', 'post', None, None): + (lambda: 'posts' if this.resource['topic'] else 'topics', +1), + ('DELETE', 'post', None, None): + (lambda: 'posts' if this.resource['topic'] else 'topics', -1), + ('POST', 'user', None, None): + ('users', +1), + ('DELETE', 'user', None, None): + ('users', -1), + } + +_MAX_STAT_RECORDS = 100 + +_logger = logging.getLogger('node.stats') + + +class StatRoutes(object): + + _rrd = None + _stats = None + _rating = None + _stated = False + + def stats_init(self, path, step, rras): + _logger.info('Collect node stats in %r', path) + + self._rrd = Rrd(path, 'stats', _DS, step, rras) + self._stats = self._rrd.values() + self._rating = {'context': {}, 'post': {}} + + if not self._stats: + for field, traits in _DS.items(): + value = 0 + if traits['type'] == 'GAUGE': + directory = this.volume[traits['resource']] + __, value = directory.find(limit=0, **traits['query']) + self._stats[field] = value + + @postroute + def stat_on_postroute(self, result, exception, stat_rating=True): + if self._rrd is None or exception is not None: + return result + + r = this.request + route_ = _ROUTES.get((r.method, r.resource, r.prop, r.cmd)) + if route_ is None: + return result + stat, shift = route_ + self._stated = True + + if not isinstance(stat, basestring): + stat = stat() + self._stats[stat] += shift + + if stat_rating and r.method == 'POST' and r.resource == 'post': + rating = None + if stat == 'topics' and this.resource['type'] == 'review': + rating = self._rating['context'] + rating = rating.setdefault(this.resource['context'], [0, 0]) + else: + rating = self._rating['post'] + rating = rating.setdefault(this.resource['topic'], [0, 0]) + if rating is not None: + rating[0] += shift + rating[1] += shift * this.resource['vote'] + + return result + + @route('GET', cmd='stats', arguments={ + 'start': int, 'end': int, 'records': int, 'source': list}, + mime_type='application/json') + def stats(self, start, end, records, source): + enforce(self._rrd is not None, 'Statistics disabled') + + if not start: + start = self._rrd.first or 0 + if not end: + end = self._rrd.last or 0 + if records > _MAX_STAT_RECORDS: + _logger.debug('Decrease %d stats records number to %d', + records, _MAX_STAT_RECORDS) + records = _MAX_STAT_RECORDS + elif records <= 0: + records = _MAX_STAT_RECORDS / 10 + resolution = max(1, (end - start) / records) + + result = [] + for ts, values in self._rrd.get(start, end, resolution): + if source: + values = dict([(i, values[i]) for i in source]) + result.append((ts, values)) + return result + + def stats_auto_commit(self): + while True: + coroutine.sleep(self._rrd.step) + self.stats_commit() + + def stats_commit(self, timestamp=None): + if not self._stated: + return + self._stated = False + + _logger.trace('Commit stats') + + self._rrd.put(self._stats, timestamp) + for field, traits in _DS.items(): + if traits['type'] == 'ABSOLUTE': + self._stats[field] = 0 + + for resource, stats_ in self._rating.items(): + directory = this.volume[resource] + for guid, (votes, reviews) in stats_.items(): + rating = directory[guid]['rating'] + directory.update(guid, { + 'rating': [rating[0] + votes, rating[1] + reviews], + }) + stats_.clear() + + def stats_regen(self, path, step, rras): + for i in Rrd(path, 'stats', _DS, step, rras).files: + os.unlink(i) + self.stats_init(path, step, rras) + for field in self._stats: + self._stats[field] = 0 + + def timeline(ts): + ts = long(ts) + end = long(time.time()) + step_ = None + + archives = {} + for rra in rras: + a_step, a_size = [long(i) for i in rra.split(':')[-2:]] + a_step *= step + a_start = end - min(end, a_step * a_size) + if archives.setdefault(a_start, a_step) > a_step: + archives[a_start] = a_step + archives = list(sorted(archives.items())) + + while ts <= end: + while not step_ or archives and ts >= archives[0][0]: + archive_start, step_ = archives.pop(0) + ts = max(ts / step_ * step_, archive_start) + yield ts, ts + step_ - 1, step_ + ts += step_ + + items, __ = this.volume['context'].find(limit=1, order_by='ctime') + start = next(items)['ctime'] + for left, right, __ in timeline(start): + for resource in ('user', 'context', 'post', 'report'): + items, __ = this.volume[resource].find( + query='ctime:%s..%s' % (left, right)) + for this.resource in items: + this.request = Request(method='POST', path=[resource]) + self.stat_on_postroute(None, None, False) + self.stats_commit(left + (right - left) / 2) + + def stats_regen_rating(self, path, step, rras): + + def calc_rating(**kwargs): + rating = [0, 0] + alldocs, __ = this.volume['post'].find(**kwargs) + for post in alldocs: + rating[0] += 1 + rating[1] += post['vote'] + return rating + + alldocs, __ = this.volume['context'].find() + for context in alldocs: + rating = calc_rating(type='review', context=context.guid) + this.volume['context'].update(context.guid, {'rating': rating}) + + alldocs, __ = this.volume['post'].find(topic='') + for topic in alldocs: + rating = calc_rating(topic=topic.guid) + this.volume['post'].update(topic.guid, {'rating': rating}) -- cgit v0.9.1