diff options
Diffstat (limited to 'sugar_network')
-rw-r--r-- | sugar_network/db/routes.py | 6 | ||||
-rw-r--r-- | sugar_network/model/post.py | 14 | ||||
-rw-r--r-- | sugar_network/node/model.py | 22 | ||||
-rw-r--r-- | sugar_network/node/routes.py | 4 | ||||
-rw-r--r-- | sugar_network/node/stats.py | 269 | ||||
-rw-r--r-- | sugar_network/toolkit/router.py | 14 | ||||
-rw-r--r-- | sugar_network/toolkit/rrd.py | 250 |
7 files changed, 539 insertions, 40 deletions
diff --git a/sugar_network/db/routes.py b/sugar_network/db/routes.py index 22e3782..221d066 100644 --- a/sugar_network/db/routes.py +++ b/sugar_network/db/routes.py @@ -48,7 +48,8 @@ class Routes(object): authors = doc.posts['author'] = {} self._useradd(authors, this.principal, Author.ORIGINAL) self.volume[this.request.resource].create(doc.posts) - return doc['guid'] + this.request.guid = doc.guid + return doc.guid @route('PUT', [None, None], acl=ACL.AUTH | ACL.AUTHOR) def update(self): @@ -194,7 +195,6 @@ class Routes(object): 'GUID in cannot be changed') doc = self.volume[this.request.resource][this.request.guid] enforce(doc.available, 'Resource not found') - this.resource = doc def teardown(new, old): for name, value in new.items(): @@ -254,7 +254,7 @@ class Routes(object): def _aggpost(self, acl): request = this.request - doc = this.resource = self.volume[request.resource][request.guid] + doc = self.volume[request.resource][request.guid] prop = doc.metadata[request.prop] enforce(isinstance(prop, Aggregated), http.BadRequest, 'Property is not aggregated') diff --git a/sugar_network/model/post.py b/sugar_network/model/post.py index e0b3b25..a4c7dbf 100644 --- a/sugar_network/model/post.py +++ b/sugar_network/model/post.py @@ -15,7 +15,6 @@ from sugar_network import db, model from sugar_network.toolkit.router import ACL -from sugar_network.toolkit.coroutine import this class Post(db.Resource): @@ -52,19 +51,6 @@ class Post(db.Resource): def vote(self, value): return value - @vote.setter - def vote(self, value): - if value: - if self['topic']: - resource = this.volume['post'] - guid = self['topic'] - else: - resource = this.volume['context'] - guid = self['context'] - orig = resource[guid]['rating'] - resource.update(guid, {'rating': [orig[0] + 1, orig[1] + value]}) - return value - @db.indexed_property(db.Aggregated, prefix='D', full_text=True, subtype=db.Localized()) def comments(self, value): diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py index 87abed6..48fa5a3 100644 --- a/sugar_network/node/model.py +++ b/sugar_network/node/model.py @@ -616,28 +616,6 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None, return context, release -def generate_node_stats(volume): - - def calc_rating(**kwargs): - rating = [0, 0] - alldocs, __ = volume['post'].find(**kwargs) - for post in alldocs: - if post['vote']: - rating[0] += 1 - rating[1] += post['vote'] - return rating - - alldocs, __ = volume['context'].find() - for context in alldocs: - rating = calc_rating(type='review', context=context.guid) - volume['context'].update(context.guid, {'rating': rating}) - - alldocs, __ = volume['post'].find(topic='') - for topic in alldocs: - rating = calc_rating(type='feedback', topic=topic.guid) - volume['post'].update(topic.guid, {'rating': rating}) - - def _load_context_metadata(bundle, spec): result = {} for prop in ('homepage', 'mime_types'): diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py index 45e6e20..e5887bc 100644 --- a/sugar_network/node/routes.py +++ b/sugar_network/node/routes.py @@ -25,6 +25,7 @@ from os.path import join, exists from sugar_network import db, toolkit from sugar_network.model import FrontRoutes from sugar_network.node import model +from sugar_network.node.stats import StatRoutes from sugar_network.toolkit.router import ACL, File, route from sugar_network.toolkit.router import fallbackroute, preroute, postroute from sugar_network.toolkit.spec import parse_version @@ -38,11 +39,12 @@ _GUID_RE = re.compile('[a-zA-Z0-9_+-.]+$') _logger = logging.getLogger('node.routes') -class NodeRoutes(db.Routes, FrontRoutes): +class NodeRoutes(db.Routes, FrontRoutes, StatRoutes): def __init__(self, guid, auth=None, **kwargs): db.Routes.__init__(self, **kwargs) FrontRoutes.__init__(self) + StatRoutes.__init__(self) self._guid = guid self._auth = auth self._batch_dir = join(self.volume.root, 'batch') diff --git a/sugar_network/node/stats.py b/sugar_network/node/stats.py new file mode 100644 index 0000000..d1b0d63 --- /dev/null +++ b/sugar_network/node/stats.py @@ -0,0 +1,269 @@ +# Copyright (C) 2014 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import time +import logging + +from sugar_network.toolkit.rrd import Rrd +from sugar_network.toolkit.router import route, postroute, Request +from sugar_network.toolkit.coroutine import this +from sugar_network.toolkit import Option, coroutine, enforce + + +stats = Option( + 'collect unpersonalized node statistics', + default=False, type_cast=Option.bool_cast, action='store_true') + +stats_step = Option( + 'step interval in seconds for RRD statistics database', + default=60 * 5, type_cast=int) + +stats_rras = Option( + 'comma separated list of RRAs for RRD statistics database', + default=[ + 'RRA:AVERAGE:0.5:1:864', # 3d with 5min step + 'RRA:AVERAGE:0.5:288:3660', # 10y with 1d step + 'RRA:AVERAGE:0.5:2880:366', # 10y with 10d step + 'RRA:AVERAGE:0.5:8640:122', # 10y with 30d step + 'RRA:AVERAGE:0.5:105120:10', # 10y with 1y step + ], + type_cast=Option.list_cast, type_repr=Option.list_repr) + +_HEARTBEAT_EVER = 60 * 60 * 24 * 365 + +_DS = { + 'contexts': { + 'type': 'GAUGE', + 'heartbeat': _HEARTBEAT_EVER, + 'resource': 'context', + 'query': {}, + }, + 'released': { + 'type': 'ABSOLUTE', + }, + 'solved': { + 'type': 'ABSOLUTE', + }, + 'reported': { + 'type': 'ABSOLUTE', + }, + 'topics': { + 'type': 'GAUGE', + 'heartbeat': _HEARTBEAT_EVER, + 'resource': 'user', + 'query': {'topic': ''}, + }, + 'posts': { + 'type': 'GAUGE', + 'heartbeat': _HEARTBEAT_EVER, + 'resource': 'user', + 'query': {'not_topic': ''}, + }, + 'users': { + 'type': 'GAUGE', + 'heartbeat': _HEARTBEAT_EVER, + 'resource': 'user', + 'query': {}, + }, + } + +_ROUTES = { + ('POST', 'context', None, None): + ('contexts', +1), + ('DELETE', 'context', None, None): + ('contexts', -1), + ('POST', 'context', 'releases', None): + ('released', +1), + ('GET', 'context', None, 'solve'): + ('solved', +1), + ('POST', 'report', None, None): + ('reported', +1), + ('POST', 'post', None, None): + (lambda: 'posts' if this.resource['topic'] else 'topics', +1), + ('DELETE', 'post', None, None): + (lambda: 'posts' if this.resource['topic'] else 'topics', -1), + ('POST', 'user', None, None): + ('users', +1), + ('DELETE', 'user', None, None): + ('users', -1), + } + +_MAX_STAT_RECORDS = 100 + +_logger = logging.getLogger('node.stats') + + +class StatRoutes(object): + + _rrd = None + _stats = None + _rating = None + _stated = False + + def stats_init(self, path, step, rras): + _logger.info('Collect node stats in %r', path) + + self._rrd = Rrd(path, 'stats', _DS, step, rras) + self._stats = self._rrd.values() + self._rating = {'context': {}, 'post': {}} + + if not self._stats: + for field, traits in _DS.items(): + value = 0 + if traits['type'] == 'GAUGE': + directory = this.volume[traits['resource']] + __, value = directory.find(limit=0, **traits['query']) + self._stats[field] = value + + @postroute + def stat_on_postroute(self, result, exception, stat_rating=True): + if self._rrd is None or exception is not None: + return result + + r = this.request + route_ = _ROUTES.get((r.method, r.resource, r.prop, r.cmd)) + if route_ is None: + return result + stat, shift = route_ + self._stated = True + + if not isinstance(stat, basestring): + stat = stat() + self._stats[stat] += shift + + if stat_rating and r.method == 'POST' and r.resource == 'post': + rating = None + if stat == 'topics' and this.resource['type'] == 'review': + rating = self._rating['context'] + rating = rating.setdefault(this.resource['context'], [0, 0]) + else: + rating = self._rating['post'] + rating = rating.setdefault(this.resource['topic'], [0, 0]) + if rating is not None: + rating[0] += shift + rating[1] += shift * this.resource['vote'] + + return result + + @route('GET', cmd='stats', arguments={ + 'start': int, 'end': int, 'records': int, 'source': list}, + mime_type='application/json') + def stats(self, start, end, records, source): + enforce(self._rrd is not None, 'Statistics disabled') + + if not start: + start = self._rrd.first or 0 + if not end: + end = self._rrd.last or 0 + if records > _MAX_STAT_RECORDS: + _logger.debug('Decrease %d stats records number to %d', + records, _MAX_STAT_RECORDS) + records = _MAX_STAT_RECORDS + elif records <= 0: + records = _MAX_STAT_RECORDS / 10 + resolution = max(1, (end - start) / records) + + result = [] + for ts, values in self._rrd.get(start, end, resolution): + if source: + values = dict([(i, values[i]) for i in source]) + result.append((ts, values)) + return result + + def stats_auto_commit(self): + while True: + coroutine.sleep(self._rrd.step) + self.stats_commit() + + def stats_commit(self, timestamp=None): + if not self._stated: + return + self._stated = False + + _logger.trace('Commit stats') + + self._rrd.put(self._stats, timestamp) + for field, traits in _DS.items(): + if traits['type'] == 'ABSOLUTE': + self._stats[field] = 0 + + for resource, stats_ in self._rating.items(): + directory = this.volume[resource] + for guid, (votes, reviews) in stats_.items(): + rating = directory[guid]['rating'] + directory.update(guid, { + 'rating': [rating[0] + votes, rating[1] + reviews], + }) + stats_.clear() + + def stats_regen(self, path, step, rras): + for i in Rrd(path, 'stats', _DS, step, rras).files: + os.unlink(i) + self.stats_init(path, step, rras) + for field in self._stats: + self._stats[field] = 0 + + def timeline(ts): + ts = long(ts) + end = long(time.time()) + step_ = None + + archives = {} + for rra in rras: + a_step, a_size = [long(i) for i in rra.split(':')[-2:]] + a_step *= step + a_start = end - min(end, a_step * a_size) + if archives.setdefault(a_start, a_step) > a_step: + archives[a_start] = a_step + archives = list(sorted(archives.items())) + + while ts <= end: + while not step_ or archives and ts >= archives[0][0]: + archive_start, step_ = archives.pop(0) + ts = max(ts / step_ * step_, archive_start) + yield ts, ts + step_ - 1, step_ + ts += step_ + + items, __ = this.volume['context'].find(limit=1, order_by='ctime') + start = next(items)['ctime'] + for left, right, __ in timeline(start): + for resource in ('user', 'context', 'post', 'report'): + items, __ = this.volume[resource].find( + query='ctime:%s..%s' % (left, right)) + for this.resource in items: + this.request = Request(method='POST', path=[resource]) + self.stat_on_postroute(None, None, False) + self.stats_commit(left + (right - left) / 2) + + def stats_regen_rating(self, path, step, rras): + + def calc_rating(**kwargs): + rating = [0, 0] + alldocs, __ = this.volume['post'].find(**kwargs) + for post in alldocs: + rating[0] += 1 + rating[1] += post['vote'] + return rating + + alldocs, __ = this.volume['context'].find() + for context in alldocs: + rating = calc_rating(type='review', context=context.guid) + this.volume['context'].update(context.guid, {'rating': rating}) + + alldocs, __ = this.volume['post'].find(topic='') + for topic in alldocs: + rating = calc_rating(topic=topic.guid) + this.volume['post'].update(topic.guid, {'rating': rating}) diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index 6e3b43b..4470767 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -168,6 +168,10 @@ class Request(dict): def method(self): return self.environ.get('REQUEST_METHOD') + @method.setter + def method(self, value): + self.environ['REQUEST_METHOD'] = value + @property def url(self): result = self.environ['PATH_INFO'] @@ -223,6 +227,8 @@ class Request(dict): @resource.setter def resource(self, value): + while len(self.path) < 1: + self.path.append(None) self.path[0] = value @property @@ -232,6 +238,8 @@ class Request(dict): @guid.setter def guid(self, value): + while len(self.path) < 2: + self.path.append(None) self.path[1] = value @property @@ -241,6 +249,8 @@ class Request(dict): @prop.setter def prop(self, value): + while len(self.path) < 3: + self.path.append(None) self.path[2] = value @property @@ -250,6 +260,8 @@ class Request(dict): @key.setter def key(self, value): + while len(self.path) < 4: + self.path.append(None) self.path[3] = value @property @@ -535,6 +547,8 @@ class Router(object): # To populate `exception` only raise finally: + this.request = request + this.response = response for i in self._postroutes: result = i(result, exception) diff --git a/sugar_network/toolkit/rrd.py b/sugar_network/toolkit/rrd.py new file mode 100644 index 0000000..d8386e5 --- /dev/null +++ b/sugar_network/toolkit/rrd.py @@ -0,0 +1,250 @@ +# Copyright (C) 2012-2014 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"""Convenient access to RRD databases.""" + +import re +import os +import time +import bisect +import logging +from os.path import exists, join, splitext, basename + +import rrdtool + +from . import Bin + + +_DB_FILENAME_RE = re.compile('(.*?)(-[0-9]+){0,1}\\.rrd$') +_INFO_RE = re.compile('([^[]+)\\[([^]]+)\\]\\.(.*)$') + +_FETCH_PAGE = 256 + +_logger = logging.getLogger('rrd') + + +class Rrd(object): + + def __init__(self, root, name, fields, step, rras): + self._root = root + self.name = name + self.step = step + self._fields = fields or {} + # rrdtool knows nothing about `unicode` + self._rras = [i.encode('utf8') for i in rras or []] + self._revisions = [] + self._db = None + + _logger.debug('[%s] open rrd at %r', self.name, root) + + if not exists(self._root): + os.makedirs(self._root) + + for filename in os.listdir(self._root): + match = _DB_FILENAME_RE.match(filename) + if match is None: + continue + name_, revision = match.groups() + if name_ == name: + self._load(filename, int(revision or 0)) + + @property + def files(self): + for rev in self._revisions: + yield rev.path + + @property + def first(self): + return self._revisions[0].first if self._revisions else None + + @property + def last(self): + return self._revisions[-1].last if self._revisions else None + + def values(self, timestamp=None): + return self._revisions[-1].values(timestamp) if self._revisions else {} + + def put(self, values, timestamp=None): + if not timestamp: + timestamp = int(time.time()) + timestamp = timestamp / self.step * self.step + _logger.trace('[%s] put %r', self.name, values) + self._get_db(timestamp, values).put(values, timestamp) + + def get(self, start=None, end=None, resolution=None): + if not self._revisions: + return + + if not resolution: + resolution = self.step + + if start is None: + start = self._revisions[0].first + if end is None: + end = self._revisions[-1].last + + revisions = [] + for db in reversed(self._revisions): + revisions.append(db) + if db.last <= start: + break + + start = start - start % self.step - self.step + last = min(end, start + _FETCH_PAGE * resolution) + last -= last % self.step + self.step + + for db in reversed(revisions): + db_end = min(last, db.last - self.step) + if start > db_end: + break + for ts, row in db.get(start, db_end, resolution): + if ts > end: + break + yield ts, row + start = db_end + 1 + + def _get_db(self, timestamp, values): + if self._db is not None: + return self._db + + fields = [] + for field in sorted(values.keys()): + ds = self._fields.get(field) or {} + ds_type = ds.get('type') or 'GAUGE' + ds_heartbeat = ds.get('heartbeat') or self.step * 2 + fields.append('DS:%s:%s:%s:U:U' % (field, ds_type, ds_heartbeat)) + _logger.debug('[%s] fields from jut values: %r', self.name, fields) + + if not self._revisions: + self._db = self._create_db(0, fields, timestamp) + else: + db = self._revisions[-1] + if db.fields == fields and db.rras == self._rras: + self._db = db + else: + self._db = self._create_db(db.revision + 1, fields, db.last) + + return self._db + + def _create_db(self, revision, fields, timestamp): + filename = self.name + if revision: + filename += '-%s' % revision + filename += '.rrd' + _logger.debug('[%s] create database filename=%s start=%s step=%s', + self.name, filename, timestamp, self.step) + rrdtool.create( + str(join(self._root, filename)), + '--start', str(timestamp - self.step), + '--step', str(self.step), + *(fields + self._rras)) + return self._load(filename, revision) + + def _load(self, filename, revision): + _logger.debug('[%s] load database filename=%s revision=%s', + self.name, filename, revision) + db = _Db(join(self._root, filename), revision) + bisect.insort(self._revisions, db) + return db + + +class _Db(object): + + def __init__(self, path, revision=0): + self.path = str(path) + basepath = splitext(path)[0] + self.name = basename(basepath) + self._meta = Bin(basepath + '.meta', {}) + self.revision = revision + self.fields = [] + self.field_names = [] + self.rras = [] + + info = rrdtool.info(self.path) + self.step = info['step'] + self.last = info['last_update'] + + fields = {} + rras = {} + for key, value in info.items(): + match = _INFO_RE.match(key) + if match is None: + continue + prefix, key, prop = match.groups() + if prefix == 'ds': + fields.setdefault(key, {}) + fields[key][prop] = value + if prefix == 'rra': + rras.setdefault(key, {}) + rras[key][prop] = value + for index in sorted([int(i) for i in rras.keys()]): + rra = rras[str(index)] + self.rras.append( + 'RRA:%(cf)s:%(xff)s:%(pdp_per_row)s:%(rows)s' % rra) + for name in sorted(fields.keys()): + ds = fields[name] + self.fields.append('DS:%s:%s:%s:U:U' % + (name, ds['type'], ds['minimal_heartbeat'])) + self.field_names.append(name) + + @property + def first(self): + return self._meta['first'] or 0 + + def values(self, timestamp): + if timestamp and timestamp - self.last <= self.step and \ + 'pending' in self._meta: + return self._meta['pending'] + info = rrdtool.info(self.path) + result = {} + for field in self.field_names: + result[field] = float(info.get('ds[%s].last_ds' % field) or 0) + return result + + def put(self, values, timestamp): + if timestamp - self.last < self.step: + self._meta['pending'] = values + self._meta.commit() + return + if 'pending' in self._meta: + pending = self._meta.pop('pending') + if timestamp - self.last >= self.step * 2: + self.put(pending, self.last + self.step) + self._meta.commit() + if not self.first: + self._meta['first'] = timestamp + self._meta.commit() + value = [str(timestamp)] + for name in self.field_names: + value.append(str(values[name])) + rrdtool.update(self.path, str(':'.join(value))) + self.last = timestamp + + def get(self, start, end, resolution): + (row_start, start, row_step), __, rows = rrdtool.fetch( + str(self.path), + 'AVERAGE', + '--start', str(start), + '--end', str(end), + '--resolution', str(resolution)) + for raw_row in rows: + row_start += row_step + row = {} + for i, value in enumerate(raw_row): + row[self.field_names[i]] = value or .0 + yield row_start, row + + def __cmp__(self, other): + return cmp(self.revision, other.revision) |