Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
path: root/sugar_network/node/stats.py
diff options
Diffstat (limited to 'sugar_network/node/stats.py')
1 files changed, 269 insertions, 0 deletions
diff --git a/sugar_network/node/stats.py b/sugar_network/node/stats.py
new file mode 100644
index 0000000..d1b0d63
--- /dev/null
+++ b/sugar_network/node/stats.py
@@ -0,0 +1,269 @@
+# Copyright (C) 2014 Aleksey Lim
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
+import time
+import logging
+from sugar_network.toolkit.rrd import Rrd
+from sugar_network.toolkit.router import route, postroute, Request
+from sugar_network.toolkit.coroutine import this
+from sugar_network.toolkit import Option, coroutine, enforce
+stats = Option(
+ 'collect unpersonalized node statistics',
+ default=False, type_cast=Option.bool_cast, action='store_true')
+stats_step = Option(
+ 'step interval in seconds for RRD statistics database',
+ default=60 * 5, type_cast=int)
+stats_rras = Option(
+ 'comma separated list of RRAs for RRD statistics database',
+ default=[
+ 'RRA:AVERAGE:0.5:1:864', # 3d with 5min step
+ 'RRA:AVERAGE:0.5:288:3660', # 10y with 1d step
+ 'RRA:AVERAGE:0.5:2880:366', # 10y with 10d step
+ 'RRA:AVERAGE:0.5:8640:122', # 10y with 30d step
+ 'RRA:AVERAGE:0.5:105120:10', # 10y with 1y step
+ ],
+ type_cast=Option.list_cast, type_repr=Option.list_repr)
+_HEARTBEAT_EVER = 60 * 60 * 24 * 365
+_DS = {
+ 'contexts': {
+ 'type': 'GAUGE',
+ 'heartbeat': _HEARTBEAT_EVER,
+ 'resource': 'context',
+ 'query': {},
+ },
+ 'released': {
+ 'type': 'ABSOLUTE',
+ },
+ 'solved': {
+ 'type': 'ABSOLUTE',
+ },
+ 'reported': {
+ 'type': 'ABSOLUTE',
+ },
+ 'topics': {
+ 'type': 'GAUGE',
+ 'heartbeat': _HEARTBEAT_EVER,
+ 'resource': 'user',
+ 'query': {'topic': ''},
+ },
+ 'posts': {
+ 'type': 'GAUGE',
+ 'heartbeat': _HEARTBEAT_EVER,
+ 'resource': 'user',
+ 'query': {'not_topic': ''},
+ },
+ 'users': {
+ 'type': 'GAUGE',
+ 'heartbeat': _HEARTBEAT_EVER,
+ 'resource': 'user',
+ 'query': {},
+ },
+ }
+_ROUTES = {
+ ('POST', 'context', None, None):
+ ('contexts', +1),
+ ('DELETE', 'context', None, None):
+ ('contexts', -1),
+ ('POST', 'context', 'releases', None):
+ ('released', +1),
+ ('GET', 'context', None, 'solve'):
+ ('solved', +1),
+ ('POST', 'report', None, None):
+ ('reported', +1),
+ ('POST', 'post', None, None):
+ (lambda: 'posts' if this.resource['topic'] else 'topics', +1),
+ ('DELETE', 'post', None, None):
+ (lambda: 'posts' if this.resource['topic'] else 'topics', -1),
+ ('POST', 'user', None, None):
+ ('users', +1),
+ ('DELETE', 'user', None, None):
+ ('users', -1),
+ }
+_logger = logging.getLogger('node.stats')
+class StatRoutes(object):
+ _rrd = None
+ _stats = None
+ _rating = None
+ _stated = False
+ def stats_init(self, path, step, rras):
+ _logger.info('Collect node stats in %r', path)
+ self._rrd = Rrd(path, 'stats', _DS, step, rras)
+ self._stats = self._rrd.values()
+ self._rating = {'context': {}, 'post': {}}
+ if not self._stats:
+ for field, traits in _DS.items():
+ value = 0
+ if traits['type'] == 'GAUGE':
+ directory = this.volume[traits['resource']]
+ __, value = directory.find(limit=0, **traits['query'])
+ self._stats[field] = value
+ @postroute
+ def stat_on_postroute(self, result, exception, stat_rating=True):
+ if self._rrd is None or exception is not None:
+ return result
+ r = this.request
+ route_ = _ROUTES.get((r.method, r.resource, r.prop, r.cmd))
+ if route_ is None:
+ return result
+ stat, shift = route_
+ self._stated = True
+ if not isinstance(stat, basestring):
+ stat = stat()
+ self._stats[stat] += shift
+ if stat_rating and r.method == 'POST' and r.resource == 'post':
+ rating = None
+ if stat == 'topics' and this.resource['type'] == 'review':
+ rating = self._rating['context']
+ rating = rating.setdefault(this.resource['context'], [0, 0])
+ else:
+ rating = self._rating['post']
+ rating = rating.setdefault(this.resource['topic'], [0, 0])
+ if rating is not None:
+ rating[0] += shift
+ rating[1] += shift * this.resource['vote']
+ return result
+ @route('GET', cmd='stats', arguments={
+ 'start': int, 'end': int, 'records': int, 'source': list},
+ mime_type='application/json')
+ def stats(self, start, end, records, source):
+ enforce(self._rrd is not None, 'Statistics disabled')
+ if not start:
+ start = self._rrd.first or 0
+ if not end:
+ end = self._rrd.last or 0
+ if records > _MAX_STAT_RECORDS:
+ _logger.debug('Decrease %d stats records number to %d',
+ records, _MAX_STAT_RECORDS)
+ records = _MAX_STAT_RECORDS
+ elif records <= 0:
+ records = _MAX_STAT_RECORDS / 10
+ resolution = max(1, (end - start) / records)
+ result = []
+ for ts, values in self._rrd.get(start, end, resolution):
+ if source:
+ values = dict([(i, values[i]) for i in source])
+ result.append((ts, values))
+ return result
+ def stats_auto_commit(self):
+ while True:
+ coroutine.sleep(self._rrd.step)
+ self.stats_commit()
+ def stats_commit(self, timestamp=None):
+ if not self._stated:
+ return
+ self._stated = False
+ _logger.trace('Commit stats')
+ self._rrd.put(self._stats, timestamp)
+ for field, traits in _DS.items():
+ if traits['type'] == 'ABSOLUTE':
+ self._stats[field] = 0
+ for resource, stats_ in self._rating.items():
+ directory = this.volume[resource]
+ for guid, (votes, reviews) in stats_.items():
+ rating = directory[guid]['rating']
+ directory.update(guid, {
+ 'rating': [rating[0] + votes, rating[1] + reviews],
+ })
+ stats_.clear()
+ def stats_regen(self, path, step, rras):
+ for i in Rrd(path, 'stats', _DS, step, rras).files:
+ os.unlink(i)
+ self.stats_init(path, step, rras)
+ for field in self._stats:
+ self._stats[field] = 0
+ def timeline(ts):
+ ts = long(ts)
+ end = long(time.time())
+ step_ = None
+ archives = {}
+ for rra in rras:
+ a_step, a_size = [long(i) for i in rra.split(':')[-2:]]
+ a_step *= step
+ a_start = end - min(end, a_step * a_size)
+ if archives.setdefault(a_start, a_step) > a_step:
+ archives[a_start] = a_step
+ archives = list(sorted(archives.items()))
+ while ts <= end:
+ while not step_ or archives and ts >= archives[0][0]:
+ archive_start, step_ = archives.pop(0)
+ ts = max(ts / step_ * step_, archive_start)
+ yield ts, ts + step_ - 1, step_
+ ts += step_
+ items, __ = this.volume['context'].find(limit=1, order_by='ctime')
+ start = next(items)['ctime']
+ for left, right, __ in timeline(start):
+ for resource in ('user', 'context', 'post', 'report'):
+ items, __ = this.volume[resource].find(
+ query='ctime:%s..%s' % (left, right))
+ for this.resource in items:
+ this.request = Request(method='POST', path=[resource])
+ self.stat_on_postroute(None, None, False)
+ self.stats_commit(left + (right - left) / 2)
+ def stats_regen_rating(self, path, step, rras):
+ def calc_rating(**kwargs):
+ rating = [0, 0]
+ alldocs, __ = this.volume['post'].find(**kwargs)
+ for post in alldocs:
+ rating[0] += 1
+ rating[1] += post['vote']
+ return rating
+ alldocs, __ = this.volume['context'].find()
+ for context in alldocs:
+ rating = calc_rating(type='review', context=context.guid)
+ this.volume['context'].update(context.guid, {'rating': rating})
+ alldocs, __ = this.volume['post'].find(topic='')
+ for topic in alldocs:
+ rating = calc_rating(topic=topic.guid)
+ this.volume['post'].update(topic.guid, {'rating': rating})