Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/node/stats_user.py
blob: 0b9644957976a66b3502595c9ebda8aacbc26c39 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# Copyright (C) 2012-2013 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import logging
from os.path import join, exists, isdir

from sugar_network import node, toolkit
from sugar_network.toolkit.rrd import Rrd
from sugar_network.toolkit import Option, pylru, enforce


stats_user = Option(
        'accept personalized users statistics',
        default=False, type_cast=Option.bool_cast, action='store_true')

stats_user_step = Option(
        'step interval in seconds for users\' RRD databases',
        default=60, type_cast=int)

stats_user_rras = Option(
        'comma separated list of RRAs for users\' RRD databases',
        default=[
            'RRA:AVERAGE:0.5:1:4320',   # one day with 60s step
            'RRA:AVERAGE:0.5:5:2016',   # one week with 5min step
            ],
        type_cast=Option.list_cast, type_repr=Option.list_repr)

_logger = logging.getLogger('node.stats_user')
_user_cache = pylru.lrucache(32)


def get_rrd(user):
    if user in _user_cache:
        return _user_cache[user]
    else:
        rrd = _user_cache[user] = Rrd(_rrd_path(user),
                stats_user_step.value, stats_user_rras.value)
        return rrd


def diff(in_info=None):
    if in_info is None:
        in_info = {}
    out_info = {}

    try:
        for user, rrd in _walk_rrd(join(node.stats_root.value, 'user')):
            in_info.setdefault(user, {})
            out_info.setdefault(user, {})

            for db in rrd:
                yield {'db': db.name, 'user': user}

                in_seq = in_info[user].get(db.name)
                if in_seq is None:
                    in_seq = toolkit.PersistentSequence(
                            join(rrd.root, db.name + '.push'), [1, None])
                    in_info[user][db.name] = in_seq
                elif in_seq is not toolkit.Sequence:
                    in_seq = in_info[user][db.name] = toolkit.Sequence(in_seq)
                out_seq = out_info[user].setdefault(db.name,
                        toolkit.Sequence())

                for start, end in in_seq:
                    for timestamp, values in \
                            db.get(max(start, db.first), end or db.last):
                        yield {'timestamp': timestamp, 'values': values}
                        in_seq.exclude(start, timestamp)
                        out_seq.include(start, timestamp)
                        start = timestamp
    except StopIteration:
        pass

    yield {'commit': out_info}


def merge(packet):
    db = None
    seq = None

    for record in packet:
        if 'db' in record:
            db = get_rrd(record['user'])[record['db']]
        elif 'commit' in record:
            seq = record['commit']
        else:
            enforce(db is not None, 'Malformed user stats diff')
            db.put(record['values'], record['timestamp'])

    return seq


def commit(info):
    for user, dbs in info.items():
        for db_name, merged in dbs.items():
            seq = toolkit.PersistentSequence(
                    _rrd_path(user, db_name + '.push'), [1, None])
            seq.exclude(merged)
            seq.commit()


def _walk_rrd(root):
    if not exists(root):
        return
    for users_dirname in os.listdir(root):
        users_dir = join(root, users_dirname)
        if not isdir(users_dir):
            continue
        for user in os.listdir(users_dir):
            yield user, Rrd(join(users_dir, user), stats_user_step.value)


def _rrd_path(user, *args):
    return join(node.stats_root.value, 'user', user[:2], user, *args)