Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/node/stats.py
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2012-08-09 16:59:50 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2012-08-09 17:30:56 (GMT)
commitf81fee8335345b025610fd8789ee19c03d9e15f4 (patch)
tree6d0a0fb5013ee204494e1c34fcf6efb6cbede4be /sugar_network/node/stats.py
parentc8ce1a074715d809dd3ec435978113e5b0c30ed7 (diff)
Process stats in node sync
Diffstat (limited to 'sugar_network/node/stats.py')
-rw-r--r--sugar_network/node/stats.py69
1 files changed, 69 insertions, 0 deletions
diff --git a/sugar_network/node/stats.py b/sugar_network/node/stats.py
index 0f6a049..817def5 100644
--- a/sugar_network/node/stats.py
+++ b/sugar_network/node/stats.py
@@ -13,7 +13,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
+import logging
+from os.path import join, exists, isdir
+
+from pylru import lrucache
+
from active_toolkit.options import Option
+from sugar_network.toolkit.rrd import Rrd, ReadOnlyRrd
+from sugar_network.toolkit.collection import Sequence, PersistentSequence
stats = Option(
@@ -32,3 +40,64 @@ stats_rras = Option(
'space separated list of RRAs for RRD databases',
default=['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'],
type_cast=Option.list_cast, type_repr=Option.list_repr)
+
+
+_logger = logging.getLogger('node.stats')
+_cache = lrucache(32)
+
+
+def get_rrd(user):
+ if user in _cache:
+ return _cache[user]
+ else:
+ rrd = _cache[user] = Rrd(join(stats_root.value, user[:2], user),
+ stats_step.value, stats_rras.value)
+ return rrd
+
+
+def pull(in_seq, packet):
+ for user, rrd in _walk_rrd():
+ in_seq.setdefault(user, {})
+
+ for db, db_start, db_end in rrd.dbs:
+ seq = in_seq[user].get(db)
+ if seq is None:
+ seq = in_seq[user][db] = PersistentSequence(
+ join(rrd.root, db + '.push'), [1, None])
+ elif seq is not dict:
+ seq = in_seq[user][db] = Sequence(seq)
+ out_seq = Sequence()
+
+ def dump():
+ for start, end in seq:
+ for timestamp, values in \
+ rrd.get(db, max(start, db_start), end or db_end):
+ yield {'timestamp': timestamp, 'values': values}
+ seq.exclude(start, timestamp)
+ out_seq.include(start, timestamp)
+ start = timestamp
+
+ packet.push(dump(), arcname=join('stats', user, db),
+ cmd='stats_push', user=user, db=db,
+ sequence=out_seq)
+
+
+def commit(sequences):
+ for user, dbs in sequences.items():
+ for db, merged in dbs.items():
+ seq = PersistentSequence(
+ join(stats_root.value, user[:2], user, db + '.push'),
+ [1, None])
+ seq.exclude(merged)
+ seq.commit()
+
+
+def _walk_rrd():
+ if not exists(stats_root.value):
+ return
+ for users_dirname in os.listdir(stats_root.value):
+ users_dir = join(stats_root.value, users_dirname)
+ if not isdir(users_dir):
+ continue
+ for user in os.listdir(users_dir):
+ yield user, ReadOnlyRrd(join(users_dir, user))