diff options
Diffstat (limited to 'sugar_network/toolkit')
-rw-r--r-- | sugar_network/toolkit/router.py | 14 | ||||
-rw-r--r-- | sugar_network/toolkit/rrd.py | 250 |
2 files changed, 264 insertions, 0 deletions
diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index 6e3b43b..4470767 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -168,6 +168,10 @@ class Request(dict): def method(self): return self.environ.get('REQUEST_METHOD') + @method.setter + def method(self, value): + self.environ['REQUEST_METHOD'] = value + @property def url(self): result = self.environ['PATH_INFO'] @@ -223,6 +227,8 @@ class Request(dict): @resource.setter def resource(self, value): + while len(self.path) < 1: + self.path.append(None) self.path[0] = value @property @@ -232,6 +238,8 @@ class Request(dict): @guid.setter def guid(self, value): + while len(self.path) < 2: + self.path.append(None) self.path[1] = value @property @@ -241,6 +249,8 @@ class Request(dict): @prop.setter def prop(self, value): + while len(self.path) < 3: + self.path.append(None) self.path[2] = value @property @@ -250,6 +260,8 @@ class Request(dict): @key.setter def key(self, value): + while len(self.path) < 4: + self.path.append(None) self.path[3] = value @property @@ -535,6 +547,8 @@ class Router(object): # To populate `exception` only raise finally: + this.request = request + this.response = response for i in self._postroutes: result = i(result, exception) diff --git a/sugar_network/toolkit/rrd.py b/sugar_network/toolkit/rrd.py new file mode 100644 index 0000000..d8386e5 --- /dev/null +++ b/sugar_network/toolkit/rrd.py @@ -0,0 +1,250 @@ +# Copyright (C) 2012-2014 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"""Convenient access to RRD databases.""" + +import re +import os +import time +import bisect +import logging +from os.path import exists, join, splitext, basename + +import rrdtool + +from . import Bin + + +_DB_FILENAME_RE = re.compile('(.*?)(-[0-9]+){0,1}\\.rrd$') +_INFO_RE = re.compile('([^[]+)\\[([^]]+)\\]\\.(.*)$') + +_FETCH_PAGE = 256 + +_logger = logging.getLogger('rrd') + + +class Rrd(object): + + def __init__(self, root, name, fields, step, rras): + self._root = root + self.name = name + self.step = step + self._fields = fields or {} + # rrdtool knows nothing about `unicode` + self._rras = [i.encode('utf8') for i in rras or []] + self._revisions = [] + self._db = None + + _logger.debug('[%s] open rrd at %r', self.name, root) + + if not exists(self._root): + os.makedirs(self._root) + + for filename in os.listdir(self._root): + match = _DB_FILENAME_RE.match(filename) + if match is None: + continue + name_, revision = match.groups() + if name_ == name: + self._load(filename, int(revision or 0)) + + @property + def files(self): + for rev in self._revisions: + yield rev.path + + @property + def first(self): + return self._revisions[0].first if self._revisions else None + + @property + def last(self): + return self._revisions[-1].last if self._revisions else None + + def values(self, timestamp=None): + return self._revisions[-1].values(timestamp) if self._revisions else {} + + def put(self, values, timestamp=None): + if not timestamp: + timestamp = int(time.time()) + timestamp = timestamp / self.step * self.step + _logger.trace('[%s] put %r', self.name, values) + self._get_db(timestamp, values).put(values, timestamp) + + def get(self, start=None, end=None, resolution=None): + if not self._revisions: + return + + if not resolution: + resolution = self.step + + if start is None: + start = self._revisions[0].first + if end is None: + end = self._revisions[-1].last + + revisions = [] + for db in reversed(self._revisions): + revisions.append(db) + if db.last <= start: + break + + start = start - start % self.step - self.step + last = min(end, start + _FETCH_PAGE * resolution) + last -= last % self.step + self.step + + for db in reversed(revisions): + db_end = min(last, db.last - self.step) + if start > db_end: + break + for ts, row in db.get(start, db_end, resolution): + if ts > end: + break + yield ts, row + start = db_end + 1 + + def _get_db(self, timestamp, values): + if self._db is not None: + return self._db + + fields = [] + for field in sorted(values.keys()): + ds = self._fields.get(field) or {} + ds_type = ds.get('type') or 'GAUGE' + ds_heartbeat = ds.get('heartbeat') or self.step * 2 + fields.append('DS:%s:%s:%s:U:U' % (field, ds_type, ds_heartbeat)) + _logger.debug('[%s] fields from jut values: %r', self.name, fields) + + if not self._revisions: + self._db = self._create_db(0, fields, timestamp) + else: + db = self._revisions[-1] + if db.fields == fields and db.rras == self._rras: + self._db = db + else: + self._db = self._create_db(db.revision + 1, fields, db.last) + + return self._db + + def _create_db(self, revision, fields, timestamp): + filename = self.name + if revision: + filename += '-%s' % revision + filename += '.rrd' + _logger.debug('[%s] create database filename=%s start=%s step=%s', + self.name, filename, timestamp, self.step) + rrdtool.create( + str(join(self._root, filename)), + '--start', str(timestamp - self.step), + '--step', str(self.step), + *(fields + self._rras)) + return self._load(filename, revision) + + def _load(self, filename, revision): + _logger.debug('[%s] load database filename=%s revision=%s', + self.name, filename, revision) + db = _Db(join(self._root, filename), revision) + bisect.insort(self._revisions, db) + return db + + +class _Db(object): + + def __init__(self, path, revision=0): + self.path = str(path) + basepath = splitext(path)[0] + self.name = basename(basepath) + self._meta = Bin(basepath + '.meta', {}) + self.revision = revision + self.fields = [] + self.field_names = [] + self.rras = [] + + info = rrdtool.info(self.path) + self.step = info['step'] + self.last = info['last_update'] + + fields = {} + rras = {} + for key, value in info.items(): + match = _INFO_RE.match(key) + if match is None: + continue + prefix, key, prop = match.groups() + if prefix == 'ds': + fields.setdefault(key, {}) + fields[key][prop] = value + if prefix == 'rra': + rras.setdefault(key, {}) + rras[key][prop] = value + for index in sorted([int(i) for i in rras.keys()]): + rra = rras[str(index)] + self.rras.append( + 'RRA:%(cf)s:%(xff)s:%(pdp_per_row)s:%(rows)s' % rra) + for name in sorted(fields.keys()): + ds = fields[name] + self.fields.append('DS:%s:%s:%s:U:U' % + (name, ds['type'], ds['minimal_heartbeat'])) + self.field_names.append(name) + + @property + def first(self): + return self._meta['first'] or 0 + + def values(self, timestamp): + if timestamp and timestamp - self.last <= self.step and \ + 'pending' in self._meta: + return self._meta['pending'] + info = rrdtool.info(self.path) + result = {} + for field in self.field_names: + result[field] = float(info.get('ds[%s].last_ds' % field) or 0) + return result + + def put(self, values, timestamp): + if timestamp - self.last < self.step: + self._meta['pending'] = values + self._meta.commit() + return + if 'pending' in self._meta: + pending = self._meta.pop('pending') + if timestamp - self.last >= self.step * 2: + self.put(pending, self.last + self.step) + self._meta.commit() + if not self.first: + self._meta['first'] = timestamp + self._meta.commit() + value = [str(timestamp)] + for name in self.field_names: + value.append(str(values[name])) + rrdtool.update(self.path, str(':'.join(value))) + self.last = timestamp + + def get(self, start, end, resolution): + (row_start, start, row_step), __, rows = rrdtool.fetch( + str(self.path), + 'AVERAGE', + '--start', str(start), + '--end', str(end), + '--resolution', str(resolution)) + for raw_row in rows: + row_start += row_step + row = {} + for i, value in enumerate(raw_row): + row[self.field_names[i]] = value or .0 + yield row_start, row + + def __cmp__(self, other): + return cmp(self.revision, other.revision) |