1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
# Copyright (C) 2012-2013 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import logging
from os.path import join, exists, isdir
from sugar_network import node, toolkit
from sugar_network.toolkit.rrd import Rrd
from sugar_network.toolkit import Option, pylru, enforce
stats_user = Option(
'accept personalized users statistics',
default=False, type_cast=Option.bool_cast, action='store_true')
stats_user_step = Option(
'step interval in seconds for users\' RRD databases',
default=60, type_cast=int)
stats_user_rras = Option(
'comma separated list of RRAs for users\' RRD databases',
default=[
'RRA:AVERAGE:0.5:1:4320', # one day with 60s step
'RRA:AVERAGE:0.5:5:2016', # one week with 5min step
],
type_cast=Option.list_cast, type_repr=Option.list_repr)
_logger = logging.getLogger('node.stats_user')
_user_cache = pylru.lrucache(32)
def get_rrd(user):
if user in _user_cache:
return _user_cache[user]
else:
rrd = _user_cache[user] = Rrd(_rrd_path(user),
stats_user_step.value, stats_user_rras.value)
return rrd
def diff(in_info=None):
if in_info is None:
in_info = {}
out_info = {}
try:
for user, rrd in _walk_rrd(join(node.stats_root.value, 'user')):
in_info.setdefault(user, {})
out_info.setdefault(user, {})
for db in rrd:
yield {'db': db.name, 'user': user}
in_seq = in_info[user].get(db.name)
if in_seq is None:
in_seq = toolkit.PersistentSequence(
join(rrd.root, db.name + '.push'), [1, None])
in_info[user][db.name] = in_seq
elif in_seq is not toolkit.Sequence:
in_seq = in_info[user][db.name] = toolkit.Sequence(in_seq)
out_seq = out_info[user].setdefault(db.name,
toolkit.Sequence())
for start, end in in_seq:
for timestamp, values in \
db.get(max(start, db.first), end or db.last):
yield {'timestamp': timestamp, 'values': values}
in_seq.exclude(start, timestamp)
out_seq.include(start, timestamp)
start = timestamp
except StopIteration:
pass
yield {'commit': out_info}
def merge(packet):
db = None
seq = None
for record in packet:
if 'db' in record:
db = get_rrd(record['user'])[record['db']]
elif 'commit' in record:
seq = record['commit']
else:
enforce(db is not None, 'Malformed user stats diff')
db.put(record['values'], record['timestamp'])
return seq
def commit(info):
for user, dbs in info.items():
for db_name, merged in dbs.items():
seq = toolkit.PersistentSequence(
_rrd_path(user, db_name + '.push'), [1, None])
seq.exclude(merged)
seq.commit()
def _walk_rrd(root):
if not exists(root):
return
for users_dirname in os.listdir(root):
users_dir = join(root, users_dirname)
if not isdir(users_dir):
continue
for user in os.listdir(users_dir):
yield user, Rrd(join(users_dir, user), stats_user_step.value)
def _rrd_path(user, *args):
return join(node.stats_root.value, 'user', user[:2], user, *args)
|