diff options
Diffstat (limited to 'stats_consolidation')
12 files changed, 1118 insertions, 0 deletions
diff --git a/stats_consolidation/__init__.py b/stats_consolidation/__init__.py new file mode 100644 index 0000000..c06c47d --- /dev/null +++ b/stats_consolidation/__init__.py @@ -0,0 +1 @@ +"""stats_consolidation""" diff --git a/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/__init__.py b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/__init__.py new file mode 100644 index 0000000..dd6e324 --- /dev/null +++ b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/__init__.py @@ -0,0 +1 @@ +"""stats_consolidation module """ diff --git a/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/consolidation.py b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/consolidation.py new file mode 100644 index 0000000..4003594 --- /dev/null +++ b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/consolidation.py @@ -0,0 +1,34 @@ +import os +import stats_consolidation +import argparse + +from rrd_files import * +from db import * + +class Consolidation: + + def __init__(self, path, db): + self.base_path = path + self.date_start = db.get_date_last_record() + self.db = db + def process_rrds (self): + id_hash_list = os.listdir(unicode(self.base_path)) + if id_hash_list: + for id_hash in id_hash_list: + user_hash_list = os.listdir( unicode( os.path.join(self.base_path, id_hash) ) ) + if user_hash_list: + for user_hash in user_hash_list: + rrd_list = os.listdir( unicode(os.path.join(self.base_path, id_hash, user_hash)) ) + if rrd_list: + for rrd in rrd_list: + rrd_path = unicode (os.path.join(self.base_path, id_hash, user_hash) ) + rrd_obj = RRD (path=rrd_path, name=rrd, date_start=self.date_start, date_end=None) + self.db.store_activity_uptime(rrd_obj) + else: + print "None rrd file found" + os.path.join(self.base_path, id_hash, user_hash) + else: + print "None hash user found on: " + os.path.join(self.base_path, id_hash) + else: + print "None hash ids found on: " + self.base_path + + diff --git a/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/db.py b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/db.py new file mode 100644 index 0000000..a8939e5 --- /dev/null +++ b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/db.py @@ -0,0 +1,202 @@ +from __future__ import print_function +import mysql.connector +from mysql.connector import errorcode +from datetime import datetime + +class DB_Stats: + TABLES={} + + TABLES['Usages'] = ( + "CREATE TABLE `Usages` (" + " `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," + " `user_hash` CHAR(40) NOT NULL," + " `resource_name` CHAR(80)," + " `start_date` TIMESTAMP NOT NULL," + " `data_type` CHAR (30) NOT NULL," + " `data` INTEGER NOT NULL," + " PRIMARY KEY (`user_hash`,`start_date`,`resource_name`, `data_type`)" + " )") + + TABLES['Resources'] = ( + "CREATE TABLE Resources (" + " `name` CHAR(250)," + " PRIMARY KEY (name)" + " )") + + TABLES['Users'] = ( + "CREATE TABLE Users(" + " `hash` CHAR (40) NOT NULL," + " `uuid` CHAR (32) NOT NULL," + " `machine_sn` CHAR(80)," + " `age` INTEGER NOT NULL," + " `school` CHAR(80)," + " `sw_version` CHAR (80)," + " `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," + " PRIMARY KEY (hash)" + " )") + + TABLES['Runs'] = ( + "CREATE TABLE Runs(" + " `last_ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP " + ")") + + + + def __init__(self, db_name, user, password): + self.db_name = db_name + self.user = user + self.password = password + + + def create_database(self, cursor): + try: + cursor.execute( + "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(self.db_name)) + except mysql.connector.Error as err: + raise Exception ("Failed creating database: {}".format(err)) + + def create_tables(self, cursor): + for name, ddl in self.TABLES.iteritems(): + try: + print("Creating table {}: ".format(name), end='') + cursor.execute(ddl) + except mysql.connector.Error as err: + if err.errno == errorcode.ER_TABLE_EXISTS_ERROR: + print("already exists.") + else: + raise Exception ("Error: {}".format(err)) + else: + print("OK") + + def create (self): + self.cnx = mysql.connector.connect(user=self.user, password=self.password) + cursor = self.cnx.cursor() + """Try connect to db """ + try: + self.cnx.database = self.db_name + print("DB ["+self.db_name+"] created already, will try create tables:" ) + self.create_tables(cursor) + except mysql.connector.Error as err: + """If db not exist, then create""" + if err.errno == errorcode.ER_BAD_DB_ERROR: + self.create_database(cursor) + self.cnx.database = self.db_name + self.create_tables(cursor) + else: + raise Exception ("Error: {}".format(err)) + cursor.close() + + + + def close (self): + self.cnx.close() + + + + def store_activity_uptime(self, rrd): + + self.store_resource(rrd.get_name()) + self.store_user(rrd) + + cursor = self.cnx.cursor() + insert = ("INSERT INTO Usages " + "(user_hash, " + "resource_name, " + "start_date, " + "data_type, " + "data) " + "VALUES (%s, %s, %s, %s ,%s) ") + + for d in rrd.get_uptime_by_interval(): + info = (rrd.get_user_hash(), rrd.get_name() , datetime.fromtimestamp(float(d[0])), 'uptime', d[1]) + try: + cursor.execute(insert, info) + if self.update_last_record(rrd.get_date_last_record()) == 0: + self.cnx.commit() + + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + cursor.close() + + + def store_resource(self, resource_name): + cursor = self.cnx.cursor() + op = ("SELECT name FROM Resources WHERE name = %s") + params = (resource_name,) + try: + cursor.execute(op, params) + result = cursor.fetchone() + if result != None: + print("Resource {} already in db".format(resource_name)) + else: + insert = ("INSERT INTO Resources (name) VALUES (%s)") + info = (resource_name, ) + cursor.execute(insert, info) + self.cnx.commit() + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + + cursor.close() + + def store_user (self, rrd): + cursor = self.cnx.cursor() + op = ("SELECT hash FROM Users WHERE hash = %s") + params = (rrd.get_user_hash(), ) + try: + cursor.execute(op, params) + result = cursor.fetchone() + if result != None: + print("User {} already in db".format(rrd.user_hash)) + else: + """FIXME change hardcoded values """ + insert = ("INSERT INTO Users (hash, uuid, machine_sn, age, school, sw_version) VALUES (%s, %s, %s, %s, %s, %s)") + params = (rrd.get_user_hash(), rrd.get_uuid(), "unk_machine_sn", 0, "unk_escuela", "1.0.0") + cursor.execute(insert, params) + self.cnx.commit() + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + + cursor.close() + + + + def update_last_record (self, ts): + cursor = self.cnx.cursor() + res = 0 + op = ("SELECT * FROM Runs") + params = (datetime.fromtimestamp(float(ts)),) + try: + cursor.execute(op) + result = cursor.fetchone() + + if result != None: + op = ("UPDATE Runs SET last_ts = %s") + cursor.execute(op, params) + self.cnx.commit() + else: + op = ("INSERT INTO Runs VALUES(%s)") + cursor.execute(op, params) + self.cnx.commit() + + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + res = -1 + + cursor.close() + return res + + def get_date_last_record (self): + cursor = self.cnx.cursor() + op = ("SELECT UNIX_TIMESTAMP ((SELECT last_ts FROM Runs))") + try: + cursor.execute(op) + result = cursor.fetchone() + if result != None: + print ("last record: {}".format(result[0])) + return result[0] + else: + print ("Last date record is None") + return 0 + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + cursor.close() diff --git a/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/rrd_files.py b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/rrd_files.py new file mode 100644 index 0000000..a437e0d --- /dev/null +++ b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/rrd_files.py @@ -0,0 +1,133 @@ +import rrdtool +import os +import sys + +class RRD: + + hdr_item = 0 + ds_item = 1 + data_item = 2 + DS = {'active':0, 'buddies':0, 'instances':0, 'new':0, 'resumed':0, 'uptime':0} + + def __init__(self, path, name, date_start, date_end): + + self.rrd_name = name + + if date_start == None: + self.date_start = str(rrdtool.first(str(os.path.join (path,name)))) + else: + self.date_start = str(date_start) + + + if date_end == None: + self.date_end = str(rrdtool.last(str(os.path.join(path, name)))) + else: + self.date_end = str(date_end) + + self.user_hash = os.path.split(path)[1] + + self.user_path = os.path.join ( + self.get_first_part_path(path, 3), + "users", + "user", + self.user_hash[:2], + self.user_hash + ) + + self.uuid = self.get_uuid_from_file(self.user_path) + + + print "*******************************************" + print " RRD " + print "start: " + self.date_start + print "end: " + self.date_end + print "PATH: " + path + print "RRD NAME: " + name + print "\n" + try: + self.rrd = rrdtool.fetch (str(os.path.join(path,name)), 'AVERAGE', '-r 60', '-s '+ self.date_start, '-e '+self.date_end) + except: + raise + + print " DS " + for item in self.DS.keys(): + idx = self.get_ds_index (item) + if idx != -1: + self.DS[item] = idx + print "DS "+ item + ": " + str(self.DS[item]) + else: + print "DS "+ item + " not found in header" + print "***********************************************" + + def get_ds_index(self, ds): + i=0 + for i in range (len (self.rrd[self.ds_item])): + if self.rrd[self.ds_item][i] == ds: + return i + i=+1 + return -1 + + def get_uptime_by_interval (self): + ds_name = "uptime" + res=list() + + print "-------Calcule "+ ds_name +"-------" + i=0 + found = False + while i < len(self.rrd[self.data_item]): + value = str(self.rrd[self.data_item][i][self.DS[ds_name]]) + if value != "None": + uptime = value + end = str (long(self.date_start) + ((i+1) * 60)) + if found == False: + found = True + start = str (long (self.date_start) + ((i+1) * 60)) + else: + if found: + print start + "->" + end + ": " + uptime + if float(uptime) > 0: + res.append((start, uptime)) + found = False + i=i+1 + return res + print "---------------------------------------------------" + + + def get_name(self): + return self.rrd_name.partition(".rrd")[0] + + def show_valid_ds(self, ds_name): + print "------------------- DS "+ ds_name +"---------------------" + i=0 + while i < len(self.rrd[self.data_item]): + timestamp = str (long (self.date_start) + ((i+1) * 60)) + value = str (self.rrd[self.data_item][i][self.DS[ds_name]]) + + if value != "None": + print timestamp+ ": " + value + i=i+1 + print "---------------------------------------------------" + + + def get_date_last_record(self): + return self.date_end + + def set_user_hash(self, u_hash): + self.user_hash = u_hash + + def get_first_part_path (self, path, idx): + l=list() + l.append(path) + for i in range (idx): + l.append(os.path.split(l[i])[0]) + return l[idx] + + def get_uuid_from_file(self,path): + return open (os.path.join(path, "machine_uuid")).next() + + + def get_user_hash(self): + return self.user_hash + + def get_uuid (self): + return self.uuid diff --git a/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/setup.py b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/setup.py new file mode 100644 index 0000000..00585ee --- /dev/null +++ b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/setup.py @@ -0,0 +1,14 @@ +from distutils.core import setup + +setup( + name = "stats_consolidation", + version = "0.1.0", + description = "Statistics translator from rrd to relational db", + author = "Gustavo Duarte", + author_email = "gduarte@activitycentral.com", + url = "http://www.acrtivitycentral.com/", + py_modules=[ + 'consolidation','db','rrd_files' + ], +) + diff --git a/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/test_cons.py b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/test_cons.py new file mode 100644 index 0000000..00f43b5 --- /dev/null +++ b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/test_cons.py @@ -0,0 +1,9 @@ +import stats_consolidation +from stats_consolidation import * + +db = DB_Stats('statistics', 'root', 'gustavo') +db.create(); + +con = Consolidation('/home/gustavo/AC/server_stats/sugar-stats/rrd', db) + +con.process_rrds() diff --git a/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/test_db.py b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/test_db.py new file mode 100644 index 0000000..197510a --- /dev/null +++ b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/test_db.py @@ -0,0 +1,6 @@ +import stats_consolidation +from stats_consolidation import * + +db = DB_Stats('statistics', 'root', 'gustavo') +db.create(); + diff --git a/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/test_rrd.py b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/test_rrd.py new file mode 100644 index 0000000..657ce18 --- /dev/null +++ b/stats_consolidation/build/lib.linux-x86_64-2.7/stats_consolidation/test_rrd.py @@ -0,0 +1,38 @@ +from db import * + +from rrd_files import * +from db import * + + +print "============================== TEST RRD -> Relational DB ========================================" +db = DB_Stats('statistics', 'root', 'gustavo') +db.create() + +DATE_START =datetime(year=2012, + month=12, + day=13, + hour=0, + minute=0, + second=0).strftime("%s") + + +DATE_END = datetime(year=2012, + month=12, + day=14, + hour=0, + minute=0, + second=0).strftime("%s") + +DATE_START = db.get_date_last_record() +DATE_END = datetime.now().strftime("%s") + +act_rrd = RRD (path = "/home/gustavo/AC/consolidation/rrds", name="pippy.rrd", date_start=DATE_START, date_end=DATE_END) +""" +act_rrd.show_valid_ds("uptime") +act_rrd.show_valid_ds("resumed") +act_rrd.show_valid_ds("new") +act_rrd.show_valid_ds("instances") +act_rrd.show_valid_ds("buddies") +""" +data = {} +db.store_activity_uptime(act_rrd) diff --git a/stats_consolidation/consolidation.py b/stats_consolidation/consolidation.py new file mode 100644 index 0000000..fd75143 --- /dev/null +++ b/stats_consolidation/consolidation.py @@ -0,0 +1,52 @@ +import os +import argparse +import logging + +from stats_consolidation.db import * +from stats_consolidation.rrd_files import * + + +log = logging.getLogger("stats-consolidation") +#log = logging.getLogger(__name__) + +class Consolidation: + + def __init__(self, path, db): + self.base_path = path + try: + self.date_start = db.get_date_last_record() + if self.date_start == 0: + self.date_start = None + self.db = db + except Exception as e: + log.error('Exception: %s ', e) + + def process_rrds (self): + id_hash_list = os.listdir(unicode(self.base_path)) + try: + if id_hash_list: + for id_hash in id_hash_list: + user_hash_list = os.listdir( unicode( os.path.join(self.base_path, id_hash) ) ) + if user_hash_list: + for user_hash in user_hash_list: + rrd_list = os.listdir( unicode(os.path.join(self.base_path, id_hash, user_hash)) ) + if rrd_list: + for rrd in rrd_list: + rrd_path = unicode (os.path.join(self.base_path, id_hash, user_hash) ) + try: + rrd_obj = RRD (path=rrd_path, name=rrd, date_start=self.date_start, date_end=None) + self.db.store_activity_uptime(rrd_obj) + self.db.store_activity_focus_time(rrd_obj) + except Exception as e: + log.warning('Exception on RRD object instance: \'%s\'', e) + else: + log.warning('RRD file not found: %s', os.path.join(self.base_path, id_hash, user_hash)) + else: + log.warning('Hash user direcotory not found: %s', os.path.join(self.base_path, id_hash)) + self.db.update_last_record(); + log.info("End RRDs processing") + else: + log.error('Hash ids not found on: %s', self.base_path) + except Exception as e: + log.error('Excpetion processing rrds: \'%s\'', e) + diff --git a/stats_consolidation/db.py b/stats_consolidation/db.py new file mode 100644 index 0000000..d6593a7 --- /dev/null +++ b/stats_consolidation/db.py @@ -0,0 +1,432 @@ +import mysql.connector +import sys, os +from mysql.connector import errorcode + +from datetime import datetime +import logging +from stats_consolidation.rrd_files import * + +log = logging.getLogger("stats-consolidation") + +class DB_Stats: + TABLES={} + + TABLES['Usages'] = ( + "CREATE TABLE `Usages` (" + " `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," + " `user_hash` CHAR(40) NOT NULL," + " `resource_name` CHAR(80)," + " `start_date` TIMESTAMP NOT NULL," + " `data_type` CHAR (30) NOT NULL," + " `data` INTEGER NOT NULL," + " PRIMARY KEY (`user_hash`,`start_date`,`resource_name`, `data_type`)" + " )") + + TABLES['Resources'] = ( + "CREATE TABLE Resources (" + " `name` CHAR(250)," + " PRIMARY KEY (name)" + " )") + + TABLES['Users'] = ( + "CREATE TABLE Users(" + " `hash` CHAR (40) NOT NULL," + " `uuid` CHAR (32) NOT NULL," + " `machine_sn` CHAR(80)," + " `age` INTEGER NOT NULL," + " `school` CHAR(80)," + " `sw_version` CHAR (80)," + " `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," + " PRIMARY KEY (hash)" + " )") + + TABLES['Runs'] = ( + "CREATE TABLE Runs(" + " `last_ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP " + ")") + + + def __init__(self, db_name, user, password): + self.db_name = db_name + self.user = user + self.password = password + + + def create_database(self, cursor): + try: + cursor.execute( + "CREATE DATABASE {0} DEFAULT CHARACTER SET 'utf8'".format(self.db_name)) + except mysql.connector.Error as err: + raise Exception ("Failed creating database: {0}".format(err)) + + def create_tables(self, cursor): + for name, ddl in self.TABLES.iteritems(): + try: + log.info('Creating table %s:', name) + cursor.execute(ddl) + except mysql.connector.Error as err: + if err.errno == errorcode.ER_TABLE_EXISTS_ERROR: + log.warning('Table %s already exists.', name) + else: + raise Exception ("Error: {0}".format(err)) + else: + log.info('Table %s crated', name) + + def create (self): + self.cnx = mysql.connector.connect(user=self.user, password=self.password) + cursor = self.cnx.cursor() + """Try connect to db """ + try: + self.cnx.database = self.db_name + log.info('Data Base %s already created, will create tables', self.db_name) + self.create_tables(cursor) + except mysql.connector.Error as err: + """If db not exist, then create""" + if err.errno == errorcode.ER_BAD_DB_ERROR: + self.create_database(cursor) + self.cnx.database = self.db_name + self.create_tables(cursor) + else: + raise Exception ("Error: {0}".format(err)) + cursor.close() + + + + + def close (self): + self.cnx.close() + + def connect (self): + try: + self.cnx = mysql.connector.connect(user=self.user, password=self.password) + cursor = self.cnx.cursor() + self.cnx.database = self.db_name + cursor.close() + except mysql.connector.Error as err: + raise Exception ("Error: {0}".format(err)) + + +#========================================================================================================= +# Q U E R I E S S A V E M E T H O D S +#========================================================================================================= + + def store_activity_uptime (self, rrd): + self.store_activity_time (rrd, 'uptime') + + def store_activity_focus_time (self, rrd): + self.store_activity_time(rrd, 'active') + + + + def store_activity_time(self, rrd, data_type): + + self.store_resource(rrd.get_name()) + self.store_user(rrd) + + cursor = self.cnx.cursor() + select = ("SELECT * FROM Usages WHERE " + "user_hash = %s AND " + "resource_name = %s AND " + "start_date = %s AND " + "data_type = %s") + update = ("UPDATE Usages SET data = %s WHERE " + "user_hash = %s AND " + "resource_name = %s AND " + "start_date = %s AND " + "data_type = %s") + insert = ("INSERT INTO Usages " + "(user_hash, " + "resource_name, " + "start_date, " + "data_type, " + "data) " + "VALUES (%s, %s, %s, %s ,%s) ") + + for d in rrd.get_last_value_by_interval(data_type): + info_sel = (rrd.get_user_hash(), rrd.get_name() , datetime.fromtimestamp(float(d[0])), data_type) + try: + """Verify if this activity has an entry already at the same start_date""" + cursor.execute (select, info_sel) + result = cursor.fetchone() + + if result != None: + log.info('Update %s \'%s\' entry for resource \'%s\' ', data_type, d[1], rrd.get_name()) + info_up = (d[1], rrd.get_user_hash(), rrd.get_name() , datetime.fromtimestamp(float(d[0])), data_type) + cursor.execute(update, info_up) + else: + log.info('New %s \'%s\' entry for resource \'%s\'', data_type, d[1], rrd.get_name()) + info_ins = (rrd.get_user_hash(), rrd.get_name() , datetime.fromtimestamp(float(d[0])), data_type, d[1]) + cursor.execute(insert, info_ins) + + self.cnx.commit() + + except mysql.connector.Error as err: + log.error('MySQL on store_activiy_time()%s: %s %s', data_type, cursor.statement, err) + cursor.close() + + + def store_resource(self, resource_name): + cursor = self.cnx.cursor() + op = ("SELECT name FROM Resources WHERE name = %s") + params = (resource_name,) + try: + cursor.execute(op, params) + result = cursor.fetchone() + if result != None: + log.debug('Resource %s already present in DB', resource_name) + else: + insert = ("INSERT INTO Resources (name) VALUES (%s)") + info = (resource_name, ) + cursor.execute(insert, info) + self.cnx.commit() + except mysql.connector.Error as err: + log.info('MySQL on store_resource: %s %s', cursor.statement, err) + + cursor.close() + + def store_user (self, rrd): + cursor = self.cnx.cursor() + op = ("SELECT hash FROM Users WHERE hash = %s") + params = (rrd.get_user_hash(), ) + try: + cursor.execute(op, params) + result = cursor.fetchone() + if result != None: + log.debug('User %s already in DB', rrd.user_hash) + else: + insert = ("INSERT INTO Users (hash, uuid, machine_sn, age, school, sw_version) VALUES (%s, %s, %s, %s, %s, %s)") + params = (rrd.get_user_hash(), rrd.get_uuid(), rrd.get_sn(), rrd.get_age(), rrd.get_school(), "1.0.0") + cursor.execute(insert, params) + self.cnx.commit() + except mysql.connector.Error as err: + log.error('MySQL on store_user %s %s', cursor.statement, err) + + cursor.close() + + + + def update_last_record (self): + cursor = self.cnx.cursor() + res = 0 + op = ("SELECT * FROM Runs") + try: + cursor.execute(op) + result = cursor.fetchone() + + if result != None: + op = ("UPDATE Runs SET last_ts = CURRENT_TIMESTAMP") + cursor.execute(op) + self.cnx.commit() + else: + op = ("INSERT INTO Runs VALUES(CURRENT_TIMESTAMP)") + cursor.execute(op) + self.cnx.commit() + log.info("Save last record"); + except mysql.connector.Error as err: + log.error('MySQL on update_last_record: %s %s', cursor.statement, err) + res = -1 + + cursor.close() + return res + + def get_date_last_record (self): + cursor = self.cnx.cursor() + op = ("SELECT UNIX_TIMESTAMP ((SELECT last_ts FROM Runs))") + try: + cursor.execute(op) + result = cursor.fetchone() + if result != None and result[0] != None: + log.info('Last record: %s', str(datetime.fromtimestamp (float (result[0])))) + return result[0] + else: + log.info('Last date record is None') + return 0 + except mysql.connector.Error as err: + log.error('MySQL on get_date_last_record: %s %s',cursor.statement, err) + except Exception as e: + raise Exception ("get_date_last_record: {0}".format(e)) + cursor.close() + + + + + +#========================================================================================================= +# R E P O R T M E T H O D S +#========================================================================================================= + def rep_activity_time (self, start, end, activity, school=None): + uptime_last=0 + activity_name='' + focus = 0 + uptime = 0 + ts_start = self.date_to_ts(start) + ts_end = self.date_to_ts(end) + + cursor1 = self.cnx.cursor() + cursor2 = self.cnx.cursor() + try: + if school != None: + select_usage = "SELECT SUM(data) FROM Usages WHERE (resource_name = %s) AND (start_date > %s) AND (start_date < %s) AND (data_type = %s) AND (user_hash = %s)" + + log.debug('Activiy time by school: %s', school) + """ Get user hash from a School""" + cursor1.execute ("SELECT hash FROM Users WHERE school = %s", (school,)) + user_hashes = cursor1.fetchall() + for user_hash in user_hashes: + log.debug('user Hash: %s', user_hash[0]) + params_focus = (activity, ts_start, ts_end, 'active', user_hash[0]) + params_uptime = (activity, ts_start, ts_end, 'uptime', user_hash[0]) + + cursor2.execute(select_usage, params_focus) + focus = float (cursor2.fetchone()[0]) + focus + cursor2.execute(select_usage, params_uptime) + uptime = float (cursor2.fetchone()[0]) + uptime + + else: + select_usage = "SELECT SUM(data) FROM Usages WHERE (resource_name = %s) AND (start_date > %s) AND (start_date < %s) AND (data_type = %s)" + params_focus = (activity, ts_start, ts_end, 'active') + params_uptime = (activity, ts_start, ts_end, 'uptime') + cursor2.execute(select_usage, params_focus) + focus = float(cursor2.fetchone()[0]) + cursor2.execute(select_usage, params_uptime) + uptime = float(cursor2.fetchone()[0]) + + log.debug('Times of (%s) from: %s -> %s: Uptime: %s, Focus: %s', activity, start, end, uptime, focus) + + + cursor1.close() + cursor2.close() + return (uptime, focus) + except mysql.connector.Error as err: + log.error('MySQL on rep_activity_time %s', err) + except Exception as e: + log.error('MySQL on rep_activity_time : %s', e) + return (None, None) + + + + def rep_get_activities (self, start, end, school=None, desktop='any'): + res_list = list(); + cursor1 = self.cnx.cursor() + cursor2 = self.cnx.cursor() + cursor3 = self.cnx.cursor() + + if desktop == 'gnome': + cursor2.execute("SELECT name FROM Resources WHERE name REGEXP 'application'") + elif desktop == 'sugar': + cursor2.execute("SELECT name FROM Resources WHERE name REGEXP 'activity'") + else: + cursor2.execute("SELECT name FROM Resources") + + resources = cursor2.fetchall() + + try: + if school != None: + log.debug('Most activiy used by school: %s', school) + """ Get user hash from a School""" + cursor1.execute ("SELECT hash FROM Users WHERE school = %s", (school,)) + user_hashes = cursor1.fetchall() + """ Cursor for select resources from Uages table""" + select_usage = "SELECT SUM(data) FROM Usages WHERE (resource_name = %s) AND (start_date > %s) AND (start_date < %s) AND (data_type = 'active') AND (user_hash = %s)" + else: + log.debug('Most activiy used') + """ Cursor for select resources from Uages table""" + select_usage = "SELECT SUM(data) FROM Usages WHERE (resource_name = %s) AND (start_date > %s) AND (start_date < %s) AND (data_type = 'active')" + + + + ts_start = self.date_to_ts(start) + ts_end = self.date_to_ts(end) + + + for resource in resources: + log.debug('Resource: %s', resource[0]) + if self.is_an_activity (resource[0]): + if school != None: + for user_hash in user_hashes: + log.debug('user Hash: %s', user_hash[0]) + cursor3.execute(select_usage, (resource[0], ts_start, ts_end, user_hash[0])) + focus = cursor3.fetchone()[0] + if focus == None: focus = 0 + + log.debug('Focus time: %s', focus) + res_list.append((resource[0], focus)) + else: + cursor3.execute(select_usage, (resource[0], ts_start, ts_end)) + focus = cursor3.fetchone()[0] + if focus == None: focus = 0 + log.debug('Focus time: %s', focus ) + res_list.append((resource[0], focus)) + + except mysql.connector.Error as err: + log.error('MySQL on most_activity_used %s', err) + except Exception as e: + log.error('most_activity_used Fail: %s', e) + cursor1.close() + cursor2.close() + cursor3.close() + log.debug ('Activities: %s', sorted(res_list, key=lambda x: x[1], reverse=True)) + return sorted(res_list, key=lambda x: x[1], reverse=True) + + + + def rep_frequency_usage (self, start, end, school=None): + cursor1 = self.cnx.cursor() + cursor2 = self.cnx.cursor() + user_hashes=() + time = 0 + try: + ts_start = self.date_to_ts(start) + ts_end = self.date_to_ts(end) + + if school != None: + log.debug('Frequency usage by school: %s', school) + """ Get user hash from a School""" + cursor1.execute ("SELECT hash FROM Users WHERE school = %s", (school,)) + user_hashes = cursor1.fetchall() + + for user_hash in user_hashes: + cursor2.execute("SELECT SUM(data) FROM Usages WHERE (resource_name = 'system') AND (start_date > %s) AND (start_date < %s) AND (data_type = 'uptime') AND (user_hash = %s)", (ts_start, ts_end, user_hash[0])) + res = cursor2.fetchone() + if res != None and res[0] != None: + time = float (res[0]) + time + else: + log.debug('Frequency usage') + cursor1.execute ("SELECT hash FROM Users") + user_hashes = cursor1.fetchall() + cursor2.execute("SELECT SUM(data) FROM Usages WHERE (resource_name = 'system') AND (start_date > %s) AND (start_date < %s) AND (data_type = 'uptime')", (ts_start, ts_end)) + time = cursor2.fetchone()[0] + + return (time, len(user_hashes)) + + + except mysql.connector.Error as err: + log.error("MySQL on %s: %s", cursor.statement, err) + cursor1.close() + cursor2.close() + + + def rep_update_school(self, machine_sn, school): + cursor = self.cnx.cursor() + try: + log.debug("Set school name: %s to user with machine_sn: %s", school, machine_sn) + cursor.execute ("UPDATE Users SET school = %s WHERE machine_sn = %s", (school, machine_sn)) + except mysql.connector.Error as err: + log.error("MySQL on %s: %s", cursor.statement, err) + + cursor.close() + +#========================================================================================================= +# A U X I L I A R M E T H O D S +#========================================================================================================= + def is_an_activity(self, name): + if (name != 'system') and (name != 'journal') and (name != 'network') and (name != 'shell'): + return True + else: + return False + + def date_to_ts(self, date): + return datetime.strptime(date, "%Y-%m-%d") + + + diff --git a/stats_consolidation/rrd_files.py b/stats_consolidation/rrd_files.py new file mode 100644 index 0000000..be13ae0 --- /dev/null +++ b/stats_consolidation/rrd_files.py @@ -0,0 +1,196 @@ +import os +import sys +import logging +import rrdtool +from datetime import datetime + + +#log = logging.getLogger(__name__) +log = logging.getLogger("stats-consolidation") + + +class RRD: + + hdr_item = 0 + ds_item = 1 + data_item = 2 + DS = {'active':0, 'buddies':0, 'instances':0, 'new':0, 'resumed':0, 'uptime':0} + + def __init__(self, path, name, date_start=None, date_end=None): + + self.rrd_name = name + + if date_start == None or (date_start > rrdtool.last(str(os.path.join(path, name)))): + self.date_start = str(rrdtool.first(str(os.path.join (path,name)))) + else: + self.date_start = str(date_start) + + if date_end == None: + self.date_end = str(rrdtool.last(str(os.path.join(path, name)))) + else: + self.date_end = str(date_end) + + if float (self.date_start) > float(self.date_end): + raise Exception("Invalid date_start={0} and date_end={1}".format( str(datetime.fromtimestamp(float(self.date_start))), str(datetime.fromtimestamp(float(self.date_end))))) + + self.user_hash = os.path.split(path)[1] + + self.user_path = os.path.join ( + self.get_first_part_path(path, 3), + "users", + "user", + self.user_hash[:2], + self.user_hash + ) + + self.uuid = self.get_uuid_from_file(self.user_path) + self.age = 0 + self.sn = self.get_machine_sn_from_file(self.user_path) + self.school = "unkown" + + + + + print "******************************************" + print " creating a RRD instance " + print "start: " + str(datetime.fromtimestamp(float(self.date_start))) + print "end: " + str(datetime.fromtimestamp(float(self.date_end))) + print "PATH: " + path + print "RRD NAME: " + name + print "\n" + try: + self.rrd = rrdtool.fetch (str(os.path.join(path,name)), 'AVERAGE', '-r 60', '-s '+ self.date_start, '-e '+ self.date_end) + except Exception as e: + raise Exception("rrdtool.fetch: {0}".format(e)) + print " DS " + for item in self.DS.keys(): + idx = self.get_ds_index (item) + if idx != -1: + self.DS[item] = idx + print "DS "+ item + ": " + str(self.DS[item]) + else: + print "DS "+ item + " not found in header" + print "***********************************************" + + def get_ds_index(self, ds): + i=0 + for i in range (len (self.rrd[self.ds_item])): + if self.rrd[self.ds_item][i] == ds: + return i + i=+1 + return -1 + + """ + Find several valid record consecutives, the last one is time of the interval. + Return: a list (start_time, total_time) + """ + def get_last_value_by_interval (self, ds_name): + res=list() + prev_value = 0.0 + i=0 + found = False + + print "-------Calcule "+ ds_name +"-------" + while i < len(self.rrd[self.data_item]): + value = str(self.rrd[self.data_item][i][self.DS[ds_name]]) + + if (value != "None") and (float(value) > 0) and (float(value) >= float(prev_value)): + prev_value = value + end = long(self.date_start) + ((i+1) * 60) + if found == False: + found = True + start = long (self.date_start) + ((i+1) * 60) + else: + if found: + if self.verify_interrupt(i, ds_name, prev_value): + print str(datetime.fromtimestamp(float(start))) + " -> " + str(datetime.fromtimestamp(float(end))) + ": " + prev_value + res.append((start, prev_value)) + found = False + prev_value = 0.0 + i=i+1 + return res + print "---------------------------------------------------" + + + def get_active_by_interval (self): + return self.get_last_value_by_interval ("active") + + def get_uptime_by_interval (self): + return self.get_last_value_by_interval ("uptime") + + def get_name(self): + return self.rrd_name.partition(".rrd")[0] + + def show_valid_ds(self, ds_name): + print "------------------- DS "+ ds_name +"---------------------" + i=0 + while i < len(self.rrd[self.data_item]): + timestamp = str (long (self.date_start) + ((i+1) * 60)) + value = str (self.rrd[self.data_item][i][self.DS[ds_name]]) + + if value != "None": + print str(datetime.fromtimestamp(float(timestamp))) + " (" + timestamp + ")" + ": " + value + i=i+1 + print "---------------------------------------------------" + + + def get_date_last_record(self): + return self.date_end + + def set_user_hash(self, u_hash): + self.user_hash = u_hash + + def get_first_part_path (self, path, idx): + l=list() + l.append(path) + for i in range (idx): + l.append(os.path.split(l[i])[0]) + return l[idx] + + def get_uuid_from_file(self,path): + return open (os.path.join(path, "machine_uuid")).next().strip("\"") + + def get_machine_sn_from_file(self,path): + return open (os.path.join(path, "machine_sn")).next().strip("\"") + + + def get_user_hash(self): + return self.user_hash + + def get_uuid (self): + return self.uuid + + def get_sn (self): + return self.sn + + def get_school (self): + return self.school + + def get_age (self): + return self.age + + + """ + For some reason, sometimes for a while activity is running, statistics library register several values as None. + To detect this behavoir, this function look-up over next records time, and verify if the value is grater than + last valid value + (interval_numb * 60). If the value es greater, means the activity still running else + the activity was stopped and starting again. + """ + def verify_interrupt(self, idx, ds_name, prev_value): + i = idx + j = 0 + while i < len(self.rrd[self.data_item]): + value = str(self.rrd[self.data_item][i][self.DS[ds_name]]) + if value != "None": + """ + print "["+str(j)+ "] current value: " + value + " prev value: " + str (float (prev_value) + (60 * j)) + " ("+ prev_value+")" + """ + if float(value) > (float (prev_value) + (60 * j)): + return False + else: + return True + i=i+1 + j=j+1 + + return True + |