diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2012-08-09 16:59:50 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2012-08-09 17:30:56 (GMT) |
commit | f81fee8335345b025610fd8789ee19c03d9e15f4 (patch) | |
tree | 6d0a0fb5013ee204494e1c34fcf6efb6cbede4be /sugar_network/node/stats.py | |
parent | c8ce1a074715d809dd3ec435978113e5b0c30ed7 (diff) |
Process stats in node sync
Diffstat (limited to 'sugar_network/node/stats.py')
-rw-r--r-- | sugar_network/node/stats.py | 69 |
1 files changed, 69 insertions, 0 deletions
diff --git a/sugar_network/node/stats.py b/sugar_network/node/stats.py index 0f6a049..817def5 100644 --- a/sugar_network/node/stats.py +++ b/sugar_network/node/stats.py @@ -13,7 +13,15 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import os +import logging +from os.path import join, exists, isdir + +from pylru import lrucache + from active_toolkit.options import Option +from sugar_network.toolkit.rrd import Rrd, ReadOnlyRrd +from sugar_network.toolkit.collection import Sequence, PersistentSequence stats = Option( @@ -32,3 +40,64 @@ stats_rras = Option( 'space separated list of RRAs for RRD databases', default=['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'], type_cast=Option.list_cast, type_repr=Option.list_repr) + + +_logger = logging.getLogger('node.stats') +_cache = lrucache(32) + + +def get_rrd(user): + if user in _cache: + return _cache[user] + else: + rrd = _cache[user] = Rrd(join(stats_root.value, user[:2], user), + stats_step.value, stats_rras.value) + return rrd + + +def pull(in_seq, packet): + for user, rrd in _walk_rrd(): + in_seq.setdefault(user, {}) + + for db, db_start, db_end in rrd.dbs: + seq = in_seq[user].get(db) + if seq is None: + seq = in_seq[user][db] = PersistentSequence( + join(rrd.root, db + '.push'), [1, None]) + elif seq is not dict: + seq = in_seq[user][db] = Sequence(seq) + out_seq = Sequence() + + def dump(): + for start, end in seq: + for timestamp, values in \ + rrd.get(db, max(start, db_start), end or db_end): + yield {'timestamp': timestamp, 'values': values} + seq.exclude(start, timestamp) + out_seq.include(start, timestamp) + start = timestamp + + packet.push(dump(), arcname=join('stats', user, db), + cmd='stats_push', user=user, db=db, + sequence=out_seq) + + +def commit(sequences): + for user, dbs in sequences.items(): + for db, merged in dbs.items(): + seq = PersistentSequence( + join(stats_root.value, user[:2], user, db + '.push'), + [1, None]) + seq.exclude(merged) + seq.commit() + + +def _walk_rrd(): + if not exists(stats_root.value): + return + for users_dirname in os.listdir(stats_root.value): + users_dir = join(stats_root.value, users_dirname) + if not isdir(users_dir): + continue + for user in os.listdir(users_dir): + yield user, ReadOnlyRrd(join(users_dir, user)) |