Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network')
-rw-r--r--sugar_network/db/routes.py6
-rw-r--r--sugar_network/model/post.py14
-rw-r--r--sugar_network/node/model.py22
-rw-r--r--sugar_network/node/routes.py4
-rw-r--r--sugar_network/node/stats.py269
-rw-r--r--sugar_network/toolkit/router.py14
-rw-r--r--sugar_network/toolkit/rrd.py250
7 files changed, 539 insertions, 40 deletions
diff --git a/sugar_network/db/routes.py b/sugar_network/db/routes.py
index 22e3782..221d066 100644
--- a/sugar_network/db/routes.py
+++ b/sugar_network/db/routes.py
@@ -48,7 +48,8 @@ class Routes(object):
authors = doc.posts['author'] = {}
self._useradd(authors, this.principal, Author.ORIGINAL)
self.volume[this.request.resource].create(doc.posts)
- return doc['guid']
+ this.request.guid = doc.guid
+ return doc.guid
@route('PUT', [None, None], acl=ACL.AUTH | ACL.AUTHOR)
def update(self):
@@ -194,7 +195,6 @@ class Routes(object):
'GUID in cannot be changed')
doc = self.volume[this.request.resource][this.request.guid]
enforce(doc.available, 'Resource not found')
- this.resource = doc
def teardown(new, old):
for name, value in new.items():
@@ -254,7 +254,7 @@ class Routes(object):
def _aggpost(self, acl):
request = this.request
- doc = this.resource = self.volume[request.resource][request.guid]
+ doc = self.volume[request.resource][request.guid]
prop = doc.metadata[request.prop]
enforce(isinstance(prop, Aggregated), http.BadRequest,
'Property is not aggregated')
diff --git a/sugar_network/model/post.py b/sugar_network/model/post.py
index e0b3b25..a4c7dbf 100644
--- a/sugar_network/model/post.py
+++ b/sugar_network/model/post.py
@@ -15,7 +15,6 @@
from sugar_network import db, model
from sugar_network.toolkit.router import ACL
-from sugar_network.toolkit.coroutine import this
class Post(db.Resource):
@@ -52,19 +51,6 @@ class Post(db.Resource):
def vote(self, value):
return value
- @vote.setter
- def vote(self, value):
- if value:
- if self['topic']:
- resource = this.volume['post']
- guid = self['topic']
- else:
- resource = this.volume['context']
- guid = self['context']
- orig = resource[guid]['rating']
- resource.update(guid, {'rating': [orig[0] + 1, orig[1] + value]})
- return value
-
@db.indexed_property(db.Aggregated, prefix='D', full_text=True,
subtype=db.Localized())
def comments(self, value):
diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py
index 87abed6..48fa5a3 100644
--- a/sugar_network/node/model.py
+++ b/sugar_network/node/model.py
@@ -616,28 +616,6 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None,
return context, release
-def generate_node_stats(volume):
-
- def calc_rating(**kwargs):
- rating = [0, 0]
- alldocs, __ = volume['post'].find(**kwargs)
- for post in alldocs:
- if post['vote']:
- rating[0] += 1
- rating[1] += post['vote']
- return rating
-
- alldocs, __ = volume['context'].find()
- for context in alldocs:
- rating = calc_rating(type='review', context=context.guid)
- volume['context'].update(context.guid, {'rating': rating})
-
- alldocs, __ = volume['post'].find(topic='')
- for topic in alldocs:
- rating = calc_rating(type='feedback', topic=topic.guid)
- volume['post'].update(topic.guid, {'rating': rating})
-
-
def _load_context_metadata(bundle, spec):
result = {}
for prop in ('homepage', 'mime_types'):
diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py
index 45e6e20..e5887bc 100644
--- a/sugar_network/node/routes.py
+++ b/sugar_network/node/routes.py
@@ -25,6 +25,7 @@ from os.path import join, exists
from sugar_network import db, toolkit
from sugar_network.model import FrontRoutes
from sugar_network.node import model
+from sugar_network.node.stats import StatRoutes
from sugar_network.toolkit.router import ACL, File, route
from sugar_network.toolkit.router import fallbackroute, preroute, postroute
from sugar_network.toolkit.spec import parse_version
@@ -38,11 +39,12 @@ _GUID_RE = re.compile('[a-zA-Z0-9_+-.]+$')
_logger = logging.getLogger('node.routes')
-class NodeRoutes(db.Routes, FrontRoutes):
+class NodeRoutes(db.Routes, FrontRoutes, StatRoutes):
def __init__(self, guid, auth=None, **kwargs):
db.Routes.__init__(self, **kwargs)
FrontRoutes.__init__(self)
+ StatRoutes.__init__(self)
self._guid = guid
self._auth = auth
self._batch_dir = join(self.volume.root, 'batch')
diff --git a/sugar_network/node/stats.py b/sugar_network/node/stats.py
new file mode 100644
index 0000000..d1b0d63
--- /dev/null
+++ b/sugar_network/node/stats.py
@@ -0,0 +1,269 @@
+# Copyright (C) 2014 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import time
+import logging
+
+from sugar_network.toolkit.rrd import Rrd
+from sugar_network.toolkit.router import route, postroute, Request
+from sugar_network.toolkit.coroutine import this
+from sugar_network.toolkit import Option, coroutine, enforce
+
+
+stats = Option(
+ 'collect unpersonalized node statistics',
+ default=False, type_cast=Option.bool_cast, action='store_true')
+
+stats_step = Option(
+ 'step interval in seconds for RRD statistics database',
+ default=60 * 5, type_cast=int)
+
+stats_rras = Option(
+ 'comma separated list of RRAs for RRD statistics database',
+ default=[
+ 'RRA:AVERAGE:0.5:1:864', # 3d with 5min step
+ 'RRA:AVERAGE:0.5:288:3660', # 10y with 1d step
+ 'RRA:AVERAGE:0.5:2880:366', # 10y with 10d step
+ 'RRA:AVERAGE:0.5:8640:122', # 10y with 30d step
+ 'RRA:AVERAGE:0.5:105120:10', # 10y with 1y step
+ ],
+ type_cast=Option.list_cast, type_repr=Option.list_repr)
+
+_HEARTBEAT_EVER = 60 * 60 * 24 * 365
+
+_DS = {
+ 'contexts': {
+ 'type': 'GAUGE',
+ 'heartbeat': _HEARTBEAT_EVER,
+ 'resource': 'context',
+ 'query': {},
+ },
+ 'released': {
+ 'type': 'ABSOLUTE',
+ },
+ 'solved': {
+ 'type': 'ABSOLUTE',
+ },
+ 'reported': {
+ 'type': 'ABSOLUTE',
+ },
+ 'topics': {
+ 'type': 'GAUGE',
+ 'heartbeat': _HEARTBEAT_EVER,
+ 'resource': 'user',
+ 'query': {'topic': ''},
+ },
+ 'posts': {
+ 'type': 'GAUGE',
+ 'heartbeat': _HEARTBEAT_EVER,
+ 'resource': 'user',
+ 'query': {'not_topic': ''},
+ },
+ 'users': {
+ 'type': 'GAUGE',
+ 'heartbeat': _HEARTBEAT_EVER,
+ 'resource': 'user',
+ 'query': {},
+ },
+ }
+
+_ROUTES = {
+ ('POST', 'context', None, None):
+ ('contexts', +1),
+ ('DELETE', 'context', None, None):
+ ('contexts', -1),
+ ('POST', 'context', 'releases', None):
+ ('released', +1),
+ ('GET', 'context', None, 'solve'):
+ ('solved', +1),
+ ('POST', 'report', None, None):
+ ('reported', +1),
+ ('POST', 'post', None, None):
+ (lambda: 'posts' if this.resource['topic'] else 'topics', +1),
+ ('DELETE', 'post', None, None):
+ (lambda: 'posts' if this.resource['topic'] else 'topics', -1),
+ ('POST', 'user', None, None):
+ ('users', +1),
+ ('DELETE', 'user', None, None):
+ ('users', -1),
+ }
+
+_MAX_STAT_RECORDS = 100
+
+_logger = logging.getLogger('node.stats')
+
+
+class StatRoutes(object):
+
+ _rrd = None
+ _stats = None
+ _rating = None
+ _stated = False
+
+ def stats_init(self, path, step, rras):
+ _logger.info('Collect node stats in %r', path)
+
+ self._rrd = Rrd(path, 'stats', _DS, step, rras)
+ self._stats = self._rrd.values()
+ self._rating = {'context': {}, 'post': {}}
+
+ if not self._stats:
+ for field, traits in _DS.items():
+ value = 0
+ if traits['type'] == 'GAUGE':
+ directory = this.volume[traits['resource']]
+ __, value = directory.find(limit=0, **traits['query'])
+ self._stats[field] = value
+
+ @postroute
+ def stat_on_postroute(self, result, exception, stat_rating=True):
+ if self._rrd is None or exception is not None:
+ return result
+
+ r = this.request
+ route_ = _ROUTES.get((r.method, r.resource, r.prop, r.cmd))
+ if route_ is None:
+ return result
+ stat, shift = route_
+ self._stated = True
+
+ if not isinstance(stat, basestring):
+ stat = stat()
+ self._stats[stat] += shift
+
+ if stat_rating and r.method == 'POST' and r.resource == 'post':
+ rating = None
+ if stat == 'topics' and this.resource['type'] == 'review':
+ rating = self._rating['context']
+ rating = rating.setdefault(this.resource['context'], [0, 0])
+ else:
+ rating = self._rating['post']
+ rating = rating.setdefault(this.resource['topic'], [0, 0])
+ if rating is not None:
+ rating[0] += shift
+ rating[1] += shift * this.resource['vote']
+
+ return result
+
+ @route('GET', cmd='stats', arguments={
+ 'start': int, 'end': int, 'records': int, 'source': list},
+ mime_type='application/json')
+ def stats(self, start, end, records, source):
+ enforce(self._rrd is not None, 'Statistics disabled')
+
+ if not start:
+ start = self._rrd.first or 0
+ if not end:
+ end = self._rrd.last or 0
+ if records > _MAX_STAT_RECORDS:
+ _logger.debug('Decrease %d stats records number to %d',
+ records, _MAX_STAT_RECORDS)
+ records = _MAX_STAT_RECORDS
+ elif records <= 0:
+ records = _MAX_STAT_RECORDS / 10
+ resolution = max(1, (end - start) / records)
+
+ result = []
+ for ts, values in self._rrd.get(start, end, resolution):
+ if source:
+ values = dict([(i, values[i]) for i in source])
+ result.append((ts, values))
+ return result
+
+ def stats_auto_commit(self):
+ while True:
+ coroutine.sleep(self._rrd.step)
+ self.stats_commit()
+
+ def stats_commit(self, timestamp=None):
+ if not self._stated:
+ return
+ self._stated = False
+
+ _logger.trace('Commit stats')
+
+ self._rrd.put(self._stats, timestamp)
+ for field, traits in _DS.items():
+ if traits['type'] == 'ABSOLUTE':
+ self._stats[field] = 0
+
+ for resource, stats_ in self._rating.items():
+ directory = this.volume[resource]
+ for guid, (votes, reviews) in stats_.items():
+ rating = directory[guid]['rating']
+ directory.update(guid, {
+ 'rating': [rating[0] + votes, rating[1] + reviews],
+ })
+ stats_.clear()
+
+ def stats_regen(self, path, step, rras):
+ for i in Rrd(path, 'stats', _DS, step, rras).files:
+ os.unlink(i)
+ self.stats_init(path, step, rras)
+ for field in self._stats:
+ self._stats[field] = 0
+
+ def timeline(ts):
+ ts = long(ts)
+ end = long(time.time())
+ step_ = None
+
+ archives = {}
+ for rra in rras:
+ a_step, a_size = [long(i) for i in rra.split(':')[-2:]]
+ a_step *= step
+ a_start = end - min(end, a_step * a_size)
+ if archives.setdefault(a_start, a_step) > a_step:
+ archives[a_start] = a_step
+ archives = list(sorted(archives.items()))
+
+ while ts <= end:
+ while not step_ or archives and ts >= archives[0][0]:
+ archive_start, step_ = archives.pop(0)
+ ts = max(ts / step_ * step_, archive_start)
+ yield ts, ts + step_ - 1, step_
+ ts += step_
+
+ items, __ = this.volume['context'].find(limit=1, order_by='ctime')
+ start = next(items)['ctime']
+ for left, right, __ in timeline(start):
+ for resource in ('user', 'context', 'post', 'report'):
+ items, __ = this.volume[resource].find(
+ query='ctime:%s..%s' % (left, right))
+ for this.resource in items:
+ this.request = Request(method='POST', path=[resource])
+ self.stat_on_postroute(None, None, False)
+ self.stats_commit(left + (right - left) / 2)
+
+ def stats_regen_rating(self, path, step, rras):
+
+ def calc_rating(**kwargs):
+ rating = [0, 0]
+ alldocs, __ = this.volume['post'].find(**kwargs)
+ for post in alldocs:
+ rating[0] += 1
+ rating[1] += post['vote']
+ return rating
+
+ alldocs, __ = this.volume['context'].find()
+ for context in alldocs:
+ rating = calc_rating(type='review', context=context.guid)
+ this.volume['context'].update(context.guid, {'rating': rating})
+
+ alldocs, __ = this.volume['post'].find(topic='')
+ for topic in alldocs:
+ rating = calc_rating(topic=topic.guid)
+ this.volume['post'].update(topic.guid, {'rating': rating})
diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py
index 6e3b43b..4470767 100644
--- a/sugar_network/toolkit/router.py
+++ b/sugar_network/toolkit/router.py
@@ -168,6 +168,10 @@ class Request(dict):
def method(self):
return self.environ.get('REQUEST_METHOD')
+ @method.setter
+ def method(self, value):
+ self.environ['REQUEST_METHOD'] = value
+
@property
def url(self):
result = self.environ['PATH_INFO']
@@ -223,6 +227,8 @@ class Request(dict):
@resource.setter
def resource(self, value):
+ while len(self.path) < 1:
+ self.path.append(None)
self.path[0] = value
@property
@@ -232,6 +238,8 @@ class Request(dict):
@guid.setter
def guid(self, value):
+ while len(self.path) < 2:
+ self.path.append(None)
self.path[1] = value
@property
@@ -241,6 +249,8 @@ class Request(dict):
@prop.setter
def prop(self, value):
+ while len(self.path) < 3:
+ self.path.append(None)
self.path[2] = value
@property
@@ -250,6 +260,8 @@ class Request(dict):
@key.setter
def key(self, value):
+ while len(self.path) < 4:
+ self.path.append(None)
self.path[3] = value
@property
@@ -535,6 +547,8 @@ class Router(object):
# To populate `exception` only
raise
finally:
+ this.request = request
+ this.response = response
for i in self._postroutes:
result = i(result, exception)
diff --git a/sugar_network/toolkit/rrd.py b/sugar_network/toolkit/rrd.py
new file mode 100644
index 0000000..d8386e5
--- /dev/null
+++ b/sugar_network/toolkit/rrd.py
@@ -0,0 +1,250 @@
+# Copyright (C) 2012-2014 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""Convenient access to RRD databases."""
+
+import re
+import os
+import time
+import bisect
+import logging
+from os.path import exists, join, splitext, basename
+
+import rrdtool
+
+from . import Bin
+
+
+_DB_FILENAME_RE = re.compile('(.*?)(-[0-9]+){0,1}\\.rrd$')
+_INFO_RE = re.compile('([^[]+)\\[([^]]+)\\]\\.(.*)$')
+
+_FETCH_PAGE = 256
+
+_logger = logging.getLogger('rrd')
+
+
+class Rrd(object):
+
+ def __init__(self, root, name, fields, step, rras):
+ self._root = root
+ self.name = name
+ self.step = step
+ self._fields = fields or {}
+ # rrdtool knows nothing about `unicode`
+ self._rras = [i.encode('utf8') for i in rras or []]
+ self._revisions = []
+ self._db = None
+
+ _logger.debug('[%s] open rrd at %r', self.name, root)
+
+ if not exists(self._root):
+ os.makedirs(self._root)
+
+ for filename in os.listdir(self._root):
+ match = _DB_FILENAME_RE.match(filename)
+ if match is None:
+ continue
+ name_, revision = match.groups()
+ if name_ == name:
+ self._load(filename, int(revision or 0))
+
+ @property
+ def files(self):
+ for rev in self._revisions:
+ yield rev.path
+
+ @property
+ def first(self):
+ return self._revisions[0].first if self._revisions else None
+
+ @property
+ def last(self):
+ return self._revisions[-1].last if self._revisions else None
+
+ def values(self, timestamp=None):
+ return self._revisions[-1].values(timestamp) if self._revisions else {}
+
+ def put(self, values, timestamp=None):
+ if not timestamp:
+ timestamp = int(time.time())
+ timestamp = timestamp / self.step * self.step
+ _logger.trace('[%s] put %r', self.name, values)
+ self._get_db(timestamp, values).put(values, timestamp)
+
+ def get(self, start=None, end=None, resolution=None):
+ if not self._revisions:
+ return
+
+ if not resolution:
+ resolution = self.step
+
+ if start is None:
+ start = self._revisions[0].first
+ if end is None:
+ end = self._revisions[-1].last
+
+ revisions = []
+ for db in reversed(self._revisions):
+ revisions.append(db)
+ if db.last <= start:
+ break
+
+ start = start - start % self.step - self.step
+ last = min(end, start + _FETCH_PAGE * resolution)
+ last -= last % self.step + self.step
+
+ for db in reversed(revisions):
+ db_end = min(last, db.last - self.step)
+ if start > db_end:
+ break
+ for ts, row in db.get(start, db_end, resolution):
+ if ts > end:
+ break
+ yield ts, row
+ start = db_end + 1
+
+ def _get_db(self, timestamp, values):
+ if self._db is not None:
+ return self._db
+
+ fields = []
+ for field in sorted(values.keys()):
+ ds = self._fields.get(field) or {}
+ ds_type = ds.get('type') or 'GAUGE'
+ ds_heartbeat = ds.get('heartbeat') or self.step * 2
+ fields.append('DS:%s:%s:%s:U:U' % (field, ds_type, ds_heartbeat))
+ _logger.debug('[%s] fields from jut values: %r', self.name, fields)
+
+ if not self._revisions:
+ self._db = self._create_db(0, fields, timestamp)
+ else:
+ db = self._revisions[-1]
+ if db.fields == fields and db.rras == self._rras:
+ self._db = db
+ else:
+ self._db = self._create_db(db.revision + 1, fields, db.last)
+
+ return self._db
+
+ def _create_db(self, revision, fields, timestamp):
+ filename = self.name
+ if revision:
+ filename += '-%s' % revision
+ filename += '.rrd'
+ _logger.debug('[%s] create database filename=%s start=%s step=%s',
+ self.name, filename, timestamp, self.step)
+ rrdtool.create(
+ str(join(self._root, filename)),
+ '--start', str(timestamp - self.step),
+ '--step', str(self.step),
+ *(fields + self._rras))
+ return self._load(filename, revision)
+
+ def _load(self, filename, revision):
+ _logger.debug('[%s] load database filename=%s revision=%s',
+ self.name, filename, revision)
+ db = _Db(join(self._root, filename), revision)
+ bisect.insort(self._revisions, db)
+ return db
+
+
+class _Db(object):
+
+ def __init__(self, path, revision=0):
+ self.path = str(path)
+ basepath = splitext(path)[0]
+ self.name = basename(basepath)
+ self._meta = Bin(basepath + '.meta', {})
+ self.revision = revision
+ self.fields = []
+ self.field_names = []
+ self.rras = []
+
+ info = rrdtool.info(self.path)
+ self.step = info['step']
+ self.last = info['last_update']
+
+ fields = {}
+ rras = {}
+ for key, value in info.items():
+ match = _INFO_RE.match(key)
+ if match is None:
+ continue
+ prefix, key, prop = match.groups()
+ if prefix == 'ds':
+ fields.setdefault(key, {})
+ fields[key][prop] = value
+ if prefix == 'rra':
+ rras.setdefault(key, {})
+ rras[key][prop] = value
+ for index in sorted([int(i) for i in rras.keys()]):
+ rra = rras[str(index)]
+ self.rras.append(
+ 'RRA:%(cf)s:%(xff)s:%(pdp_per_row)s:%(rows)s' % rra)
+ for name in sorted(fields.keys()):
+ ds = fields[name]
+ self.fields.append('DS:%s:%s:%s:U:U' %
+ (name, ds['type'], ds['minimal_heartbeat']))
+ self.field_names.append(name)
+
+ @property
+ def first(self):
+ return self._meta['first'] or 0
+
+ def values(self, timestamp):
+ if timestamp and timestamp - self.last <= self.step and \
+ 'pending' in self._meta:
+ return self._meta['pending']
+ info = rrdtool.info(self.path)
+ result = {}
+ for field in self.field_names:
+ result[field] = float(info.get('ds[%s].last_ds' % field) or 0)
+ return result
+
+ def put(self, values, timestamp):
+ if timestamp - self.last < self.step:
+ self._meta['pending'] = values
+ self._meta.commit()
+ return
+ if 'pending' in self._meta:
+ pending = self._meta.pop('pending')
+ if timestamp - self.last >= self.step * 2:
+ self.put(pending, self.last + self.step)
+ self._meta.commit()
+ if not self.first:
+ self._meta['first'] = timestamp
+ self._meta.commit()
+ value = [str(timestamp)]
+ for name in self.field_names:
+ value.append(str(values[name]))
+ rrdtool.update(self.path, str(':'.join(value)))
+ self.last = timestamp
+
+ def get(self, start, end, resolution):
+ (row_start, start, row_step), __, rows = rrdtool.fetch(
+ str(self.path),
+ 'AVERAGE',
+ '--start', str(start),
+ '--end', str(end),
+ '--resolution', str(resolution))
+ for raw_row in rows:
+ row_start += row_step
+ row = {}
+ for i, value in enumerate(raw_row):
+ row[self.field_names[i]] = value or .0
+ yield row_start, row
+
+ def __cmp__(self, other):
+ return cmp(self.revision, other.revision)