diff options
author | Gustavo Duarte <gduarte@activitycentral.com> | 2012-12-26 20:13:42 (GMT) |
---|---|---|
committer | Gustavo Duarte <gduarte@activitycentral.com> | 2012-12-26 20:13:42 (GMT) |
commit | 084262ea16b2b352251065dc63a5bb19fcba93f7 (patch) | |
tree | 9d736e8fbea20109e2ac5a25153f4396e1a3cb43 | |
parent | 28a781c4f0e061a2c693554302495e62d8c7253f (diff) |
add setup.py for distribute the package
28 files changed, 1703 insertions, 9 deletions
diff --git a/build/lib.linux-x86_64-2.7/stats_consolidation/__init__.py b/build/lib.linux-x86_64-2.7/stats_consolidation/__init__.py new file mode 100644 index 0000000..c06c47d --- /dev/null +++ b/build/lib.linux-x86_64-2.7/stats_consolidation/__init__.py @@ -0,0 +1 @@ +"""stats_consolidation""" diff --git a/build/lib.linux-x86_64-2.7/stats_consolidation/consolidation.py b/build/lib.linux-x86_64-2.7/stats_consolidation/consolidation.py new file mode 100644 index 0000000..01d8b84 --- /dev/null +++ b/build/lib.linux-x86_64-2.7/stats_consolidation/consolidation.py @@ -0,0 +1,35 @@ +import os +import argparse + +import rrd_files +import db +from rrd_files import * +from db import * + +class Consolidation: + + def __init__(self, path, db): + self.base_path = path + self.date_start = db.get_date_last_record() + self.db = db + def process_rrds (self): + id_hash_list = os.listdir(unicode(self.base_path)) + if id_hash_list: + for id_hash in id_hash_list: + user_hash_list = os.listdir( unicode( os.path.join(self.base_path, id_hash) ) ) + if user_hash_list: + for user_hash in user_hash_list: + rrd_list = os.listdir( unicode(os.path.join(self.base_path, id_hash, user_hash)) ) + if rrd_list: + for rrd in rrd_list: + rrd_path = unicode (os.path.join(self.base_path, id_hash, user_hash) ) + rrd_obj = RRD (path=rrd_path, name=rrd, date_start=self.date_start, date_end=None) + self.db.store_activity_uptime(rrd_obj) + else: + print "None rrd file found" + os.path.join(self.base_path, id_hash, user_hash) + else: + print "None hash user found on: " + os.path.join(self.base_path, id_hash) + else: + print "None hash ids found on: " + self.base_path + + diff --git a/build/lib.linux-x86_64-2.7/stats_consolidation/db.py b/build/lib.linux-x86_64-2.7/stats_consolidation/db.py new file mode 100644 index 0000000..ddc1006 --- /dev/null +++ b/build/lib.linux-x86_64-2.7/stats_consolidation/db.py @@ -0,0 +1,206 @@ +from __future__ import print_function +import mysql.connector +from mysql.connector import errorcode +from datetime import datetime + +import stats_consolidation.rrd_files + +from rrd_files import * + +class DB_Stats: + TABLES={} + + TABLES['Usages'] = ( + "CREATE TABLE `Usages` (" + " `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," + " `user_hash` CHAR(40) NOT NULL," + " `resource_name` CHAR(80)," + " `start_date` TIMESTAMP NOT NULL," + " `data_type` CHAR (30) NOT NULL," + " `data` INTEGER NOT NULL," + " PRIMARY KEY (`user_hash`,`start_date`,`resource_name`, `data_type`)" + " )") + + TABLES['Resources'] = ( + "CREATE TABLE Resources (" + " `name` CHAR(250)," + " PRIMARY KEY (name)" + " )") + + TABLES['Users'] = ( + "CREATE TABLE Users(" + " `hash` CHAR (40) NOT NULL," + " `uuid` CHAR (32) NOT NULL," + " `machine_sn` CHAR(80)," + " `age` INTEGER NOT NULL," + " `school` CHAR(80)," + " `sw_version` CHAR (80)," + " `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," + " PRIMARY KEY (hash)" + " )") + + TABLES['Runs'] = ( + "CREATE TABLE Runs(" + " `last_ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP " + ")") + + + + def __init__(self, db_name, user, password): + self.db_name = db_name + self.user = user + self.password = password + + + def create_database(self, cursor): + try: + cursor.execute( + "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(self.db_name)) + except mysql.connector.Error as err: + raise Exception ("Failed creating database: {}".format(err)) + + def create_tables(self, cursor): + for name, ddl in self.TABLES.iteritems(): + try: + print("Creating table {}: ".format(name), end='') + cursor.execute(ddl) + except mysql.connector.Error as err: + if err.errno == errorcode.ER_TABLE_EXISTS_ERROR: + print("already exists.") + else: + raise Exception ("Error: {}".format(err)) + else: + print("OK") + + def create (self): + self.cnx = mysql.connector.connect(user=self.user, password=self.password) + cursor = self.cnx.cursor() + """Try connect to db """ + try: + self.cnx.database = self.db_name + print("DB ["+self.db_name+"] created already, will try create tables:" ) + self.create_tables(cursor) + except mysql.connector.Error as err: + """If db not exist, then create""" + if err.errno == errorcode.ER_BAD_DB_ERROR: + self.create_database(cursor) + self.cnx.database = self.db_name + self.create_tables(cursor) + else: + raise Exception ("Error: {}".format(err)) + cursor.close() + + + + def close (self): + self.cnx.close() + + + + def store_activity_uptime(self, rrd): + + self.store_resource(rrd.get_name()) + self.store_user(rrd) + + cursor = self.cnx.cursor() + insert = ("INSERT INTO Usages " + "(user_hash, " + "resource_name, " + "start_date, " + "data_type, " + "data) " + "VALUES (%s, %s, %s, %s ,%s) ") + + for d in rrd.get_uptime_by_interval(): + info = (rrd.get_user_hash(), rrd.get_name() , datetime.fromtimestamp(float(d[0])), 'uptime', d[1]) + try: + cursor.execute(insert, info) + if self.update_last_record(rrd.get_date_last_record()) == 0: + self.cnx.commit() + + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + cursor.close() + + + def store_resource(self, resource_name): + cursor = self.cnx.cursor() + op = ("SELECT name FROM Resources WHERE name = %s") + params = (resource_name,) + try: + cursor.execute(op, params) + result = cursor.fetchone() + if result != None: + print("Resource {} already in db".format(resource_name)) + else: + insert = ("INSERT INTO Resources (name) VALUES (%s)") + info = (resource_name, ) + cursor.execute(insert, info) + self.cnx.commit() + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + + cursor.close() + + def store_user (self, rrd): + cursor = self.cnx.cursor() + op = ("SELECT hash FROM Users WHERE hash = %s") + params = (rrd.get_user_hash(), ) + try: + cursor.execute(op, params) + result = cursor.fetchone() + if result != None: + print("User {} already in db".format(rrd.user_hash)) + else: + """FIXME change hardcoded values """ + insert = ("INSERT INTO Users (hash, uuid, machine_sn, age, school, sw_version) VALUES (%s, %s, %s, %s, %s, %s)") + params = (rrd.get_user_hash(), rrd.get_uuid(), "unk_machine_sn", 0, "unk_escuela", "1.0.0") + cursor.execute(insert, params) + self.cnx.commit() + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + + cursor.close() + + + + def update_last_record (self, ts): + cursor = self.cnx.cursor() + res = 0 + op = ("SELECT * FROM Runs") + params = (datetime.fromtimestamp(float(ts)),) + try: + cursor.execute(op) + result = cursor.fetchone() + + if result != None: + op = ("UPDATE Runs SET last_ts = %s") + cursor.execute(op, params) + self.cnx.commit() + else: + op = ("INSERT INTO Runs VALUES(%s)") + cursor.execute(op, params) + self.cnx.commit() + + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + res = -1 + + cursor.close() + return res + + def get_date_last_record (self): + cursor = self.cnx.cursor() + op = ("SELECT UNIX_TIMESTAMP ((SELECT last_ts FROM Runs))") + try: + cursor.execute(op) + result = cursor.fetchone() + if result != None: + print ("last record: {}".format(result[0])) + return result[0] + else: + print ("Last date record is None") + return 0 + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + cursor.close() diff --git a/build/lib.linux-x86_64-2.7/stats_consolidation/rrd_files.py b/build/lib.linux-x86_64-2.7/stats_consolidation/rrd_files.py new file mode 100644 index 0000000..a437e0d --- /dev/null +++ b/build/lib.linux-x86_64-2.7/stats_consolidation/rrd_files.py @@ -0,0 +1,133 @@ +import rrdtool +import os +import sys + +class RRD: + + hdr_item = 0 + ds_item = 1 + data_item = 2 + DS = {'active':0, 'buddies':0, 'instances':0, 'new':0, 'resumed':0, 'uptime':0} + + def __init__(self, path, name, date_start, date_end): + + self.rrd_name = name + + if date_start == None: + self.date_start = str(rrdtool.first(str(os.path.join (path,name)))) + else: + self.date_start = str(date_start) + + + if date_end == None: + self.date_end = str(rrdtool.last(str(os.path.join(path, name)))) + else: + self.date_end = str(date_end) + + self.user_hash = os.path.split(path)[1] + + self.user_path = os.path.join ( + self.get_first_part_path(path, 3), + "users", + "user", + self.user_hash[:2], + self.user_hash + ) + + self.uuid = self.get_uuid_from_file(self.user_path) + + + print "*******************************************" + print " RRD " + print "start: " + self.date_start + print "end: " + self.date_end + print "PATH: " + path + print "RRD NAME: " + name + print "\n" + try: + self.rrd = rrdtool.fetch (str(os.path.join(path,name)), 'AVERAGE', '-r 60', '-s '+ self.date_start, '-e '+self.date_end) + except: + raise + + print " DS " + for item in self.DS.keys(): + idx = self.get_ds_index (item) + if idx != -1: + self.DS[item] = idx + print "DS "+ item + ": " + str(self.DS[item]) + else: + print "DS "+ item + " not found in header" + print "***********************************************" + + def get_ds_index(self, ds): + i=0 + for i in range (len (self.rrd[self.ds_item])): + if self.rrd[self.ds_item][i] == ds: + return i + i=+1 + return -1 + + def get_uptime_by_interval (self): + ds_name = "uptime" + res=list() + + print "-------Calcule "+ ds_name +"-------" + i=0 + found = False + while i < len(self.rrd[self.data_item]): + value = str(self.rrd[self.data_item][i][self.DS[ds_name]]) + if value != "None": + uptime = value + end = str (long(self.date_start) + ((i+1) * 60)) + if found == False: + found = True + start = str (long (self.date_start) + ((i+1) * 60)) + else: + if found: + print start + "->" + end + ": " + uptime + if float(uptime) > 0: + res.append((start, uptime)) + found = False + i=i+1 + return res + print "---------------------------------------------------" + + + def get_name(self): + return self.rrd_name.partition(".rrd")[0] + + def show_valid_ds(self, ds_name): + print "------------------- DS "+ ds_name +"---------------------" + i=0 + while i < len(self.rrd[self.data_item]): + timestamp = str (long (self.date_start) + ((i+1) * 60)) + value = str (self.rrd[self.data_item][i][self.DS[ds_name]]) + + if value != "None": + print timestamp+ ": " + value + i=i+1 + print "---------------------------------------------------" + + + def get_date_last_record(self): + return self.date_end + + def set_user_hash(self, u_hash): + self.user_hash = u_hash + + def get_first_part_path (self, path, idx): + l=list() + l.append(path) + for i in range (idx): + l.append(os.path.split(l[i])[0]) + return l[idx] + + def get_uuid_from_file(self,path): + return open (os.path.join(path, "machine_uuid")).next() + + + def get_user_hash(self): + return self.user_hash + + def get_uuid (self): + return self.uuid diff --git a/build/lib.linux-x86_64-2.7/stats_consolidation/setup.py b/build/lib.linux-x86_64-2.7/stats_consolidation/setup.py new file mode 100644 index 0000000..00585ee --- /dev/null +++ b/build/lib.linux-x86_64-2.7/stats_consolidation/setup.py @@ -0,0 +1,14 @@ +from distutils.core import setup + +setup( + name = "stats_consolidation", + version = "0.1.0", + description = "Statistics translator from rrd to relational db", + author = "Gustavo Duarte", + author_email = "gduarte@activitycentral.com", + url = "http://www.acrtivitycentral.com/", + py_modules=[ + 'consolidation','db','rrd_files' + ], +) + diff --git a/build/lib.linux-x86_64-2.7/stats_consolidation/test_cons.py b/build/lib.linux-x86_64-2.7/stats_consolidation/test_cons.py new file mode 100644 index 0000000..00f43b5 --- /dev/null +++ b/build/lib.linux-x86_64-2.7/stats_consolidation/test_cons.py @@ -0,0 +1,9 @@ +import stats_consolidation +from stats_consolidation import * + +db = DB_Stats('statistics', 'root', 'gustavo') +db.create(); + +con = Consolidation('/home/gustavo/AC/server_stats/sugar-stats/rrd', db) + +con.process_rrds() diff --git a/build/lib.linux-x86_64-2.7/stats_consolidation/test_db.py b/build/lib.linux-x86_64-2.7/stats_consolidation/test_db.py new file mode 100644 index 0000000..197510a --- /dev/null +++ b/build/lib.linux-x86_64-2.7/stats_consolidation/test_db.py @@ -0,0 +1,6 @@ +import stats_consolidation +from stats_consolidation import * + +db = DB_Stats('statistics', 'root', 'gustavo') +db.create(); + diff --git a/build/lib.linux-x86_64-2.7/stats_consolidation/test_rrd.py b/build/lib.linux-x86_64-2.7/stats_consolidation/test_rrd.py new file mode 100644 index 0000000..657ce18 --- /dev/null +++ b/build/lib.linux-x86_64-2.7/stats_consolidation/test_rrd.py @@ -0,0 +1,38 @@ +from db import * + +from rrd_files import * +from db import * + + +print "============================== TEST RRD -> Relational DB ========================================" +db = DB_Stats('statistics', 'root', 'gustavo') +db.create() + +DATE_START =datetime(year=2012, + month=12, + day=13, + hour=0, + minute=0, + second=0).strftime("%s") + + +DATE_END = datetime(year=2012, + month=12, + day=14, + hour=0, + minute=0, + second=0).strftime("%s") + +DATE_START = db.get_date_last_record() +DATE_END = datetime.now().strftime("%s") + +act_rrd = RRD (path = "/home/gustavo/AC/consolidation/rrds", name="pippy.rrd", date_start=DATE_START, date_end=DATE_END) +""" +act_rrd.show_valid_ds("uptime") +act_rrd.show_valid_ds("resumed") +act_rrd.show_valid_ds("new") +act_rrd.show_valid_ds("instances") +act_rrd.show_valid_ds("buddies") +""" +data = {} +db.store_activity_uptime(act_rrd) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..3a2e344 --- /dev/null +++ b/setup.py @@ -0,0 +1,15 @@ +from distutils.core import setup + +setup( + name = "stats_consolidation", + version = "0.1.0", + description = "Statistics translator from rrd to relational db", + author = "Gustavo Duarte", + author_email = "gduarte@activitycentral.com", + url = "http://www.acrtivitycentral.com/", + packages=[ + 'stats_consolidation', + ], + package_dir={'': 'src'} +) + diff --git a/src/build/lib.linux-x86_64-2.7/stats_consolidation/__init__.py b/src/build/lib.linux-x86_64-2.7/stats_consolidation/__init__.py new file mode 100644 index 0000000..dd6e324 --- /dev/null +++ b/src/build/lib.linux-x86_64-2.7/stats_consolidation/__init__.py @@ -0,0 +1 @@ +"""stats_consolidation module """ diff --git a/src/build/lib.linux-x86_64-2.7/stats_consolidation/consolidation.py b/src/build/lib.linux-x86_64-2.7/stats_consolidation/consolidation.py new file mode 100644 index 0000000..4003594 --- /dev/null +++ b/src/build/lib.linux-x86_64-2.7/stats_consolidation/consolidation.py @@ -0,0 +1,34 @@ +import os +import stats_consolidation +import argparse + +from rrd_files import * +from db import * + +class Consolidation: + + def __init__(self, path, db): + self.base_path = path + self.date_start = db.get_date_last_record() + self.db = db + def process_rrds (self): + id_hash_list = os.listdir(unicode(self.base_path)) + if id_hash_list: + for id_hash in id_hash_list: + user_hash_list = os.listdir( unicode( os.path.join(self.base_path, id_hash) ) ) + if user_hash_list: + for user_hash in user_hash_list: + rrd_list = os.listdir( unicode(os.path.join(self.base_path, id_hash, user_hash)) ) + if rrd_list: + for rrd in rrd_list: + rrd_path = unicode (os.path.join(self.base_path, id_hash, user_hash) ) + rrd_obj = RRD (path=rrd_path, name=rrd, date_start=self.date_start, date_end=None) + self.db.store_activity_uptime(rrd_obj) + else: + print "None rrd file found" + os.path.join(self.base_path, id_hash, user_hash) + else: + print "None hash user found on: " + os.path.join(self.base_path, id_hash) + else: + print "None hash ids found on: " + self.base_path + + diff --git a/src/build/lib.linux-x86_64-2.7/stats_consolidation/db.py b/src/build/lib.linux-x86_64-2.7/stats_consolidation/db.py new file mode 100644 index 0000000..a8939e5 --- /dev/null +++ b/src/build/lib.linux-x86_64-2.7/stats_consolidation/db.py @@ -0,0 +1,202 @@ +from __future__ import print_function +import mysql.connector +from mysql.connector import errorcode +from datetime import datetime + +class DB_Stats: + TABLES={} + + TABLES['Usages'] = ( + "CREATE TABLE `Usages` (" + " `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," + " `user_hash` CHAR(40) NOT NULL," + " `resource_name` CHAR(80)," + " `start_date` TIMESTAMP NOT NULL," + " `data_type` CHAR (30) NOT NULL," + " `data` INTEGER NOT NULL," + " PRIMARY KEY (`user_hash`,`start_date`,`resource_name`, `data_type`)" + " )") + + TABLES['Resources'] = ( + "CREATE TABLE Resources (" + " `name` CHAR(250)," + " PRIMARY KEY (name)" + " )") + + TABLES['Users'] = ( + "CREATE TABLE Users(" + " `hash` CHAR (40) NOT NULL," + " `uuid` CHAR (32) NOT NULL," + " `machine_sn` CHAR(80)," + " `age` INTEGER NOT NULL," + " `school` CHAR(80)," + " `sw_version` CHAR (80)," + " `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," + " PRIMARY KEY (hash)" + " )") + + TABLES['Runs'] = ( + "CREATE TABLE Runs(" + " `last_ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP " + ")") + + + + def __init__(self, db_name, user, password): + self.db_name = db_name + self.user = user + self.password = password + + + def create_database(self, cursor): + try: + cursor.execute( + "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(self.db_name)) + except mysql.connector.Error as err: + raise Exception ("Failed creating database: {}".format(err)) + + def create_tables(self, cursor): + for name, ddl in self.TABLES.iteritems(): + try: + print("Creating table {}: ".format(name), end='') + cursor.execute(ddl) + except mysql.connector.Error as err: + if err.errno == errorcode.ER_TABLE_EXISTS_ERROR: + print("already exists.") + else: + raise Exception ("Error: {}".format(err)) + else: + print("OK") + + def create (self): + self.cnx = mysql.connector.connect(user=self.user, password=self.password) + cursor = self.cnx.cursor() + """Try connect to db """ + try: + self.cnx.database = self.db_name + print("DB ["+self.db_name+"] created already, will try create tables:" ) + self.create_tables(cursor) + except mysql.connector.Error as err: + """If db not exist, then create""" + if err.errno == errorcode.ER_BAD_DB_ERROR: + self.create_database(cursor) + self.cnx.database = self.db_name + self.create_tables(cursor) + else: + raise Exception ("Error: {}".format(err)) + cursor.close() + + + + def close (self): + self.cnx.close() + + + + def store_activity_uptime(self, rrd): + + self.store_resource(rrd.get_name()) + self.store_user(rrd) + + cursor = self.cnx.cursor() + insert = ("INSERT INTO Usages " + "(user_hash, " + "resource_name, " + "start_date, " + "data_type, " + "data) " + "VALUES (%s, %s, %s, %s ,%s) ") + + for d in rrd.get_uptime_by_interval(): + info = (rrd.get_user_hash(), rrd.get_name() , datetime.fromtimestamp(float(d[0])), 'uptime', d[1]) + try: + cursor.execute(insert, info) + if self.update_last_record(rrd.get_date_last_record()) == 0: + self.cnx.commit() + + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + cursor.close() + + + def store_resource(self, resource_name): + cursor = self.cnx.cursor() + op = ("SELECT name FROM Resources WHERE name = %s") + params = (resource_name,) + try: + cursor.execute(op, params) + result = cursor.fetchone() + if result != None: + print("Resource {} already in db".format(resource_name)) + else: + insert = ("INSERT INTO Resources (name) VALUES (%s)") + info = (resource_name, ) + cursor.execute(insert, info) + self.cnx.commit() + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + + cursor.close() + + def store_user (self, rrd): + cursor = self.cnx.cursor() + op = ("SELECT hash FROM Users WHERE hash = %s") + params = (rrd.get_user_hash(), ) + try: + cursor.execute(op, params) + result = cursor.fetchone() + if result != None: + print("User {} already in db".format(rrd.user_hash)) + else: + """FIXME change hardcoded values """ + insert = ("INSERT INTO Users (hash, uuid, machine_sn, age, school, sw_version) VALUES (%s, %s, %s, %s, %s, %s)") + params = (rrd.get_user_hash(), rrd.get_uuid(), "unk_machine_sn", 0, "unk_escuela", "1.0.0") + cursor.execute(insert, params) + self.cnx.commit() + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + + cursor.close() + + + + def update_last_record (self, ts): + cursor = self.cnx.cursor() + res = 0 + op = ("SELECT * FROM Runs") + params = (datetime.fromtimestamp(float(ts)),) + try: + cursor.execute(op) + result = cursor.fetchone() + + if result != None: + op = ("UPDATE Runs SET last_ts = %s") + cursor.execute(op, params) + self.cnx.commit() + else: + op = ("INSERT INTO Runs VALUES(%s)") + cursor.execute(op, params) + self.cnx.commit() + + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + res = -1 + + cursor.close() + return res + + def get_date_last_record (self): + cursor = self.cnx.cursor() + op = ("SELECT UNIX_TIMESTAMP ((SELECT last_ts FROM Runs))") + try: + cursor.execute(op) + result = cursor.fetchone() + if result != None: + print ("last record: {}".format(result[0])) + return result[0] + else: + print ("Last date record is None") + return 0 + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + cursor.close() diff --git a/src/build/lib.linux-x86_64-2.7/stats_consolidation/rrd_files.py b/src/build/lib.linux-x86_64-2.7/stats_consolidation/rrd_files.py new file mode 100644 index 0000000..a437e0d --- /dev/null +++ b/src/build/lib.linux-x86_64-2.7/stats_consolidation/rrd_files.py @@ -0,0 +1,133 @@ +import rrdtool +import os +import sys + +class RRD: + + hdr_item = 0 + ds_item = 1 + data_item = 2 + DS = {'active':0, 'buddies':0, 'instances':0, 'new':0, 'resumed':0, 'uptime':0} + + def __init__(self, path, name, date_start, date_end): + + self.rrd_name = name + + if date_start == None: + self.date_start = str(rrdtool.first(str(os.path.join (path,name)))) + else: + self.date_start = str(date_start) + + + if date_end == None: + self.date_end = str(rrdtool.last(str(os.path.join(path, name)))) + else: + self.date_end = str(date_end) + + self.user_hash = os.path.split(path)[1] + + self.user_path = os.path.join ( + self.get_first_part_path(path, 3), + "users", + "user", + self.user_hash[:2], + self.user_hash + ) + + self.uuid = self.get_uuid_from_file(self.user_path) + + + print "*******************************************" + print " RRD " + print "start: " + self.date_start + print "end: " + self.date_end + print "PATH: " + path + print "RRD NAME: " + name + print "\n" + try: + self.rrd = rrdtool.fetch (str(os.path.join(path,name)), 'AVERAGE', '-r 60', '-s '+ self.date_start, '-e '+self.date_end) + except: + raise + + print " DS " + for item in self.DS.keys(): + idx = self.get_ds_index (item) + if idx != -1: + self.DS[item] = idx + print "DS "+ item + ": " + str(self.DS[item]) + else: + print "DS "+ item + " not found in header" + print "***********************************************" + + def get_ds_index(self, ds): + i=0 + for i in range (len (self.rrd[self.ds_item])): + if self.rrd[self.ds_item][i] == ds: + return i + i=+1 + return -1 + + def get_uptime_by_interval (self): + ds_name = "uptime" + res=list() + + print "-------Calcule "+ ds_name +"-------" + i=0 + found = False + while i < len(self.rrd[self.data_item]): + value = str(self.rrd[self.data_item][i][self.DS[ds_name]]) + if value != "None": + uptime = value + end = str (long(self.date_start) + ((i+1) * 60)) + if found == False: + found = True + start = str (long (self.date_start) + ((i+1) * 60)) + else: + if found: + print start + "->" + end + ": " + uptime + if float(uptime) > 0: + res.append((start, uptime)) + found = False + i=i+1 + return res + print "---------------------------------------------------" + + + def get_name(self): + return self.rrd_name.partition(".rrd")[0] + + def show_valid_ds(self, ds_name): + print "------------------- DS "+ ds_name +"---------------------" + i=0 + while i < len(self.rrd[self.data_item]): + timestamp = str (long (self.date_start) + ((i+1) * 60)) + value = str (self.rrd[self.data_item][i][self.DS[ds_name]]) + + if value != "None": + print timestamp+ ": " + value + i=i+1 + print "---------------------------------------------------" + + + def get_date_last_record(self): + return self.date_end + + def set_user_hash(self, u_hash): + self.user_hash = u_hash + + def get_first_part_path (self, path, idx): + l=list() + l.append(path) + for i in range (idx): + l.append(os.path.split(l[i])[0]) + return l[idx] + + def get_uuid_from_file(self,path): + return open (os.path.join(path, "machine_uuid")).next() + + + def get_user_hash(self): + return self.user_hash + + def get_uuid (self): + return self.uuid diff --git a/src/build/lib.linux-x86_64-2.7/stats_consolidation/setup.py b/src/build/lib.linux-x86_64-2.7/stats_consolidation/setup.py new file mode 100644 index 0000000..00585ee --- /dev/null +++ b/src/build/lib.linux-x86_64-2.7/stats_consolidation/setup.py @@ -0,0 +1,14 @@ +from distutils.core import setup + +setup( + name = "stats_consolidation", + version = "0.1.0", + description = "Statistics translator from rrd to relational db", + author = "Gustavo Duarte", + author_email = "gduarte@activitycentral.com", + url = "http://www.acrtivitycentral.com/", + py_modules=[ + 'consolidation','db','rrd_files' + ], +) + diff --git a/src/build/lib.linux-x86_64-2.7/stats_consolidation/test_cons.py b/src/build/lib.linux-x86_64-2.7/stats_consolidation/test_cons.py new file mode 100644 index 0000000..00f43b5 --- /dev/null +++ b/src/build/lib.linux-x86_64-2.7/stats_consolidation/test_cons.py @@ -0,0 +1,9 @@ +import stats_consolidation +from stats_consolidation import * + +db = DB_Stats('statistics', 'root', 'gustavo') +db.create(); + +con = Consolidation('/home/gustavo/AC/server_stats/sugar-stats/rrd', db) + +con.process_rrds() diff --git a/src/build/lib.linux-x86_64-2.7/stats_consolidation/test_db.py b/src/build/lib.linux-x86_64-2.7/stats_consolidation/test_db.py new file mode 100644 index 0000000..197510a --- /dev/null +++ b/src/build/lib.linux-x86_64-2.7/stats_consolidation/test_db.py @@ -0,0 +1,6 @@ +import stats_consolidation +from stats_consolidation import * + +db = DB_Stats('statistics', 'root', 'gustavo') +db.create(); + diff --git a/src/build/lib.linux-x86_64-2.7/stats_consolidation/test_rrd.py b/src/build/lib.linux-x86_64-2.7/stats_consolidation/test_rrd.py new file mode 100644 index 0000000..657ce18 --- /dev/null +++ b/src/build/lib.linux-x86_64-2.7/stats_consolidation/test_rrd.py @@ -0,0 +1,38 @@ +from db import * + +from rrd_files import * +from db import * + + +print "============================== TEST RRD -> Relational DB ========================================" +db = DB_Stats('statistics', 'root', 'gustavo') +db.create() + +DATE_START =datetime(year=2012, + month=12, + day=13, + hour=0, + minute=0, + second=0).strftime("%s") + + +DATE_END = datetime(year=2012, + month=12, + day=14, + hour=0, + minute=0, + second=0).strftime("%s") + +DATE_START = db.get_date_last_record() +DATE_END = datetime.now().strftime("%s") + +act_rrd = RRD (path = "/home/gustavo/AC/consolidation/rrds", name="pippy.rrd", date_start=DATE_START, date_end=DATE_END) +""" +act_rrd.show_valid_ds("uptime") +act_rrd.show_valid_ds("resumed") +act_rrd.show_valid_ds("new") +act_rrd.show_valid_ds("instances") +act_rrd.show_valid_ds("buddies") +""" +data = {} +db.store_activity_uptime(act_rrd) diff --git a/src/consolidation_run b/src/consolidation_run index 411058d..6de6ef4 100755 --- a/src/consolidation_run +++ b/src/consolidation_run @@ -16,12 +16,10 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. from __future__ import print_function - import argparse -import consolidation -import db -from consolidation import * -from db import * +from stats_consolidation.db import * +from stats_consolidation.rrd_files import * +from stats_consolidation.consolidation import * parser = argparse.ArgumentParser() parser.add_argument('--db_user',required=True) @@ -32,18 +30,23 @@ parser.add_argument('--log_path',required=True) args = parser.parse_args() - +""" try: db = DB_Stats (args.db_name, args.db_user, args.db_pass) db.create() except Exception as e: - print ("Creating DB: {}".format (e.msg)) - exit(1) + print ("Creating DB: {}".format (str(e) try: con = Consolidation(args.rrd_path, db) con.process_rrds() except Exception as e: - print ("Processing rrd file: {}".format(e.msg)) + print ("Processing rrd file: {}".format(str(e) +""" +db = DB_Stats (args.db_name, args.db_user, args.db_pass) +db.create() + +con = Consolidation(args.rrd_path, db) +con.process_rrds() diff --git a/src/stats_consolidation/__init__.py b/src/stats_consolidation/__init__.py new file mode 100644 index 0000000..c06c47d --- /dev/null +++ b/src/stats_consolidation/__init__.py @@ -0,0 +1 @@ +"""stats_consolidation""" diff --git a/src/stats_consolidation/build/lib.linux-x86_64-2.7/consolidation.py b/src/stats_consolidation/build/lib.linux-x86_64-2.7/consolidation.py new file mode 100644 index 0000000..4003594 --- /dev/null +++ b/src/stats_consolidation/build/lib.linux-x86_64-2.7/consolidation.py @@ -0,0 +1,34 @@ +import os +import stats_consolidation +import argparse + +from rrd_files import * +from db import * + +class Consolidation: + + def __init__(self, path, db): + self.base_path = path + self.date_start = db.get_date_last_record() + self.db = db + def process_rrds (self): + id_hash_list = os.listdir(unicode(self.base_path)) + if id_hash_list: + for id_hash in id_hash_list: + user_hash_list = os.listdir( unicode( os.path.join(self.base_path, id_hash) ) ) + if user_hash_list: + for user_hash in user_hash_list: + rrd_list = os.listdir( unicode(os.path.join(self.base_path, id_hash, user_hash)) ) + if rrd_list: + for rrd in rrd_list: + rrd_path = unicode (os.path.join(self.base_path, id_hash, user_hash) ) + rrd_obj = RRD (path=rrd_path, name=rrd, date_start=self.date_start, date_end=None) + self.db.store_activity_uptime(rrd_obj) + else: + print "None rrd file found" + os.path.join(self.base_path, id_hash, user_hash) + else: + print "None hash user found on: " + os.path.join(self.base_path, id_hash) + else: + print "None hash ids found on: " + self.base_path + + diff --git a/src/stats_consolidation/build/lib.linux-x86_64-2.7/db.py b/src/stats_consolidation/build/lib.linux-x86_64-2.7/db.py new file mode 100644 index 0000000..a8939e5 --- /dev/null +++ b/src/stats_consolidation/build/lib.linux-x86_64-2.7/db.py @@ -0,0 +1,202 @@ +from __future__ import print_function +import mysql.connector +from mysql.connector import errorcode +from datetime import datetime + +class DB_Stats: + TABLES={} + + TABLES['Usages'] = ( + "CREATE TABLE `Usages` (" + " `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," + " `user_hash` CHAR(40) NOT NULL," + " `resource_name` CHAR(80)," + " `start_date` TIMESTAMP NOT NULL," + " `data_type` CHAR (30) NOT NULL," + " `data` INTEGER NOT NULL," + " PRIMARY KEY (`user_hash`,`start_date`,`resource_name`, `data_type`)" + " )") + + TABLES['Resources'] = ( + "CREATE TABLE Resources (" + " `name` CHAR(250)," + " PRIMARY KEY (name)" + " )") + + TABLES['Users'] = ( + "CREATE TABLE Users(" + " `hash` CHAR (40) NOT NULL," + " `uuid` CHAR (32) NOT NULL," + " `machine_sn` CHAR(80)," + " `age` INTEGER NOT NULL," + " `school` CHAR(80)," + " `sw_version` CHAR (80)," + " `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," + " PRIMARY KEY (hash)" + " )") + + TABLES['Runs'] = ( + "CREATE TABLE Runs(" + " `last_ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP " + ")") + + + + def __init__(self, db_name, user, password): + self.db_name = db_name + self.user = user + self.password = password + + + def create_database(self, cursor): + try: + cursor.execute( + "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(self.db_name)) + except mysql.connector.Error as err: + raise Exception ("Failed creating database: {}".format(err)) + + def create_tables(self, cursor): + for name, ddl in self.TABLES.iteritems(): + try: + print("Creating table {}: ".format(name), end='') + cursor.execute(ddl) + except mysql.connector.Error as err: + if err.errno == errorcode.ER_TABLE_EXISTS_ERROR: + print("already exists.") + else: + raise Exception ("Error: {}".format(err)) + else: + print("OK") + + def create (self): + self.cnx = mysql.connector.connect(user=self.user, password=self.password) + cursor = self.cnx.cursor() + """Try connect to db """ + try: + self.cnx.database = self.db_name + print("DB ["+self.db_name+"] created already, will try create tables:" ) + self.create_tables(cursor) + except mysql.connector.Error as err: + """If db not exist, then create""" + if err.errno == errorcode.ER_BAD_DB_ERROR: + self.create_database(cursor) + self.cnx.database = self.db_name + self.create_tables(cursor) + else: + raise Exception ("Error: {}".format(err)) + cursor.close() + + + + def close (self): + self.cnx.close() + + + + def store_activity_uptime(self, rrd): + + self.store_resource(rrd.get_name()) + self.store_user(rrd) + + cursor = self.cnx.cursor() + insert = ("INSERT INTO Usages " + "(user_hash, " + "resource_name, " + "start_date, " + "data_type, " + "data) " + "VALUES (%s, %s, %s, %s ,%s) ") + + for d in rrd.get_uptime_by_interval(): + info = (rrd.get_user_hash(), rrd.get_name() , datetime.fromtimestamp(float(d[0])), 'uptime', d[1]) + try: + cursor.execute(insert, info) + if self.update_last_record(rrd.get_date_last_record()) == 0: + self.cnx.commit() + + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + cursor.close() + + + def store_resource(self, resource_name): + cursor = self.cnx.cursor() + op = ("SELECT name FROM Resources WHERE name = %s") + params = (resource_name,) + try: + cursor.execute(op, params) + result = cursor.fetchone() + if result != None: + print("Resource {} already in db".format(resource_name)) + else: + insert = ("INSERT INTO Resources (name) VALUES (%s)") + info = (resource_name, ) + cursor.execute(insert, info) + self.cnx.commit() + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + + cursor.close() + + def store_user (self, rrd): + cursor = self.cnx.cursor() + op = ("SELECT hash FROM Users WHERE hash = %s") + params = (rrd.get_user_hash(), ) + try: + cursor.execute(op, params) + result = cursor.fetchone() + if result != None: + print("User {} already in db".format(rrd.user_hash)) + else: + """FIXME change hardcoded values """ + insert = ("INSERT INTO Users (hash, uuid, machine_sn, age, school, sw_version) VALUES (%s, %s, %s, %s, %s, %s)") + params = (rrd.get_user_hash(), rrd.get_uuid(), "unk_machine_sn", 0, "unk_escuela", "1.0.0") + cursor.execute(insert, params) + self.cnx.commit() + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + + cursor.close() + + + + def update_last_record (self, ts): + cursor = self.cnx.cursor() + res = 0 + op = ("SELECT * FROM Runs") + params = (datetime.fromtimestamp(float(ts)),) + try: + cursor.execute(op) + result = cursor.fetchone() + + if result != None: + op = ("UPDATE Runs SET last_ts = %s") + cursor.execute(op, params) + self.cnx.commit() + else: + op = ("INSERT INTO Runs VALUES(%s)") + cursor.execute(op, params) + self.cnx.commit() + + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + res = -1 + + cursor.close() + return res + + def get_date_last_record (self): + cursor = self.cnx.cursor() + op = ("SELECT UNIX_TIMESTAMP ((SELECT last_ts FROM Runs))") + try: + cursor.execute(op) + result = cursor.fetchone() + if result != None: + print ("last record: {}".format(result[0])) + return result[0] + else: + print ("Last date record is None") + return 0 + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + cursor.close() diff --git a/src/stats_consolidation/build/lib.linux-x86_64-2.7/rrd_files.py b/src/stats_consolidation/build/lib.linux-x86_64-2.7/rrd_files.py new file mode 100644 index 0000000..a437e0d --- /dev/null +++ b/src/stats_consolidation/build/lib.linux-x86_64-2.7/rrd_files.py @@ -0,0 +1,133 @@ +import rrdtool +import os +import sys + +class RRD: + + hdr_item = 0 + ds_item = 1 + data_item = 2 + DS = {'active':0, 'buddies':0, 'instances':0, 'new':0, 'resumed':0, 'uptime':0} + + def __init__(self, path, name, date_start, date_end): + + self.rrd_name = name + + if date_start == None: + self.date_start = str(rrdtool.first(str(os.path.join (path,name)))) + else: + self.date_start = str(date_start) + + + if date_end == None: + self.date_end = str(rrdtool.last(str(os.path.join(path, name)))) + else: + self.date_end = str(date_end) + + self.user_hash = os.path.split(path)[1] + + self.user_path = os.path.join ( + self.get_first_part_path(path, 3), + "users", + "user", + self.user_hash[:2], + self.user_hash + ) + + self.uuid = self.get_uuid_from_file(self.user_path) + + + print "*******************************************" + print " RRD " + print "start: " + self.date_start + print "end: " + self.date_end + print "PATH: " + path + print "RRD NAME: " + name + print "\n" + try: + self.rrd = rrdtool.fetch (str(os.path.join(path,name)), 'AVERAGE', '-r 60', '-s '+ self.date_start, '-e '+self.date_end) + except: + raise + + print " DS " + for item in self.DS.keys(): + idx = self.get_ds_index (item) + if idx != -1: + self.DS[item] = idx + print "DS "+ item + ": " + str(self.DS[item]) + else: + print "DS "+ item + " not found in header" + print "***********************************************" + + def get_ds_index(self, ds): + i=0 + for i in range (len (self.rrd[self.ds_item])): + if self.rrd[self.ds_item][i] == ds: + return i + i=+1 + return -1 + + def get_uptime_by_interval (self): + ds_name = "uptime" + res=list() + + print "-------Calcule "+ ds_name +"-------" + i=0 + found = False + while i < len(self.rrd[self.data_item]): + value = str(self.rrd[self.data_item][i][self.DS[ds_name]]) + if value != "None": + uptime = value + end = str (long(self.date_start) + ((i+1) * 60)) + if found == False: + found = True + start = str (long (self.date_start) + ((i+1) * 60)) + else: + if found: + print start + "->" + end + ": " + uptime + if float(uptime) > 0: + res.append((start, uptime)) + found = False + i=i+1 + return res + print "---------------------------------------------------" + + + def get_name(self): + return self.rrd_name.partition(".rrd")[0] + + def show_valid_ds(self, ds_name): + print "------------------- DS "+ ds_name +"---------------------" + i=0 + while i < len(self.rrd[self.data_item]): + timestamp = str (long (self.date_start) + ((i+1) * 60)) + value = str (self.rrd[self.data_item][i][self.DS[ds_name]]) + + if value != "None": + print timestamp+ ": " + value + i=i+1 + print "---------------------------------------------------" + + + def get_date_last_record(self): + return self.date_end + + def set_user_hash(self, u_hash): + self.user_hash = u_hash + + def get_first_part_path (self, path, idx): + l=list() + l.append(path) + for i in range (idx): + l.append(os.path.split(l[i])[0]) + return l[idx] + + def get_uuid_from_file(self,path): + return open (os.path.join(path, "machine_uuid")).next() + + + def get_user_hash(self): + return self.user_hash + + def get_uuid (self): + return self.uuid diff --git a/src/stats_consolidation/consolidation.py b/src/stats_consolidation/consolidation.py new file mode 100644 index 0000000..01d8b84 --- /dev/null +++ b/src/stats_consolidation/consolidation.py @@ -0,0 +1,35 @@ +import os +import argparse + +import rrd_files +import db +from rrd_files import * +from db import * + +class Consolidation: + + def __init__(self, path, db): + self.base_path = path + self.date_start = db.get_date_last_record() + self.db = db + def process_rrds (self): + id_hash_list = os.listdir(unicode(self.base_path)) + if id_hash_list: + for id_hash in id_hash_list: + user_hash_list = os.listdir( unicode( os.path.join(self.base_path, id_hash) ) ) + if user_hash_list: + for user_hash in user_hash_list: + rrd_list = os.listdir( unicode(os.path.join(self.base_path, id_hash, user_hash)) ) + if rrd_list: + for rrd in rrd_list: + rrd_path = unicode (os.path.join(self.base_path, id_hash, user_hash) ) + rrd_obj = RRD (path=rrd_path, name=rrd, date_start=self.date_start, date_end=None) + self.db.store_activity_uptime(rrd_obj) + else: + print "None rrd file found" + os.path.join(self.base_path, id_hash, user_hash) + else: + print "None hash user found on: " + os.path.join(self.base_path, id_hash) + else: + print "None hash ids found on: " + self.base_path + + diff --git a/src/stats_consolidation/db.py b/src/stats_consolidation/db.py new file mode 100644 index 0000000..ddc1006 --- /dev/null +++ b/src/stats_consolidation/db.py @@ -0,0 +1,206 @@ +from __future__ import print_function +import mysql.connector +from mysql.connector import errorcode +from datetime import datetime + +import stats_consolidation.rrd_files + +from rrd_files import * + +class DB_Stats: + TABLES={} + + TABLES['Usages'] = ( + "CREATE TABLE `Usages` (" + " `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," + " `user_hash` CHAR(40) NOT NULL," + " `resource_name` CHAR(80)," + " `start_date` TIMESTAMP NOT NULL," + " `data_type` CHAR (30) NOT NULL," + " `data` INTEGER NOT NULL," + " PRIMARY KEY (`user_hash`,`start_date`,`resource_name`, `data_type`)" + " )") + + TABLES['Resources'] = ( + "CREATE TABLE Resources (" + " `name` CHAR(250)," + " PRIMARY KEY (name)" + " )") + + TABLES['Users'] = ( + "CREATE TABLE Users(" + " `hash` CHAR (40) NOT NULL," + " `uuid` CHAR (32) NOT NULL," + " `machine_sn` CHAR(80)," + " `age` INTEGER NOT NULL," + " `school` CHAR(80)," + " `sw_version` CHAR (80)," + " `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," + " PRIMARY KEY (hash)" + " )") + + TABLES['Runs'] = ( + "CREATE TABLE Runs(" + " `last_ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP " + ")") + + + + def __init__(self, db_name, user, password): + self.db_name = db_name + self.user = user + self.password = password + + + def create_database(self, cursor): + try: + cursor.execute( + "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(self.db_name)) + except mysql.connector.Error as err: + raise Exception ("Failed creating database: {}".format(err)) + + def create_tables(self, cursor): + for name, ddl in self.TABLES.iteritems(): + try: + print("Creating table {}: ".format(name), end='') + cursor.execute(ddl) + except mysql.connector.Error as err: + if err.errno == errorcode.ER_TABLE_EXISTS_ERROR: + print("already exists.") + else: + raise Exception ("Error: {}".format(err)) + else: + print("OK") + + def create (self): + self.cnx = mysql.connector.connect(user=self.user, password=self.password) + cursor = self.cnx.cursor() + """Try connect to db """ + try: + self.cnx.database = self.db_name + print("DB ["+self.db_name+"] created already, will try create tables:" ) + self.create_tables(cursor) + except mysql.connector.Error as err: + """If db not exist, then create""" + if err.errno == errorcode.ER_BAD_DB_ERROR: + self.create_database(cursor) + self.cnx.database = self.db_name + self.create_tables(cursor) + else: + raise Exception ("Error: {}".format(err)) + cursor.close() + + + + def close (self): + self.cnx.close() + + + + def store_activity_uptime(self, rrd): + + self.store_resource(rrd.get_name()) + self.store_user(rrd) + + cursor = self.cnx.cursor() + insert = ("INSERT INTO Usages " + "(user_hash, " + "resource_name, " + "start_date, " + "data_type, " + "data) " + "VALUES (%s, %s, %s, %s ,%s) ") + + for d in rrd.get_uptime_by_interval(): + info = (rrd.get_user_hash(), rrd.get_name() , datetime.fromtimestamp(float(d[0])), 'uptime', d[1]) + try: + cursor.execute(insert, info) + if self.update_last_record(rrd.get_date_last_record()) == 0: + self.cnx.commit() + + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + cursor.close() + + + def store_resource(self, resource_name): + cursor = self.cnx.cursor() + op = ("SELECT name FROM Resources WHERE name = %s") + params = (resource_name,) + try: + cursor.execute(op, params) + result = cursor.fetchone() + if result != None: + print("Resource {} already in db".format(resource_name)) + else: + insert = ("INSERT INTO Resources (name) VALUES (%s)") + info = (resource_name, ) + cursor.execute(insert, info) + self.cnx.commit() + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + + cursor.close() + + def store_user (self, rrd): + cursor = self.cnx.cursor() + op = ("SELECT hash FROM Users WHERE hash = %s") + params = (rrd.get_user_hash(), ) + try: + cursor.execute(op, params) + result = cursor.fetchone() + if result != None: + print("User {} already in db".format(rrd.user_hash)) + else: + """FIXME change hardcoded values """ + insert = ("INSERT INTO Users (hash, uuid, machine_sn, age, school, sw_version) VALUES (%s, %s, %s, %s, %s, %s)") + params = (rrd.get_user_hash(), rrd.get_uuid(), "unk_machine_sn", 0, "unk_escuela", "1.0.0") + cursor.execute(insert, params) + self.cnx.commit() + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + + cursor.close() + + + + def update_last_record (self, ts): + cursor = self.cnx.cursor() + res = 0 + op = ("SELECT * FROM Runs") + params = (datetime.fromtimestamp(float(ts)),) + try: + cursor.execute(op) + result = cursor.fetchone() + + if result != None: + op = ("UPDATE Runs SET last_ts = %s") + cursor.execute(op, params) + self.cnx.commit() + else: + op = ("INSERT INTO Runs VALUES(%s)") + cursor.execute(op, params) + self.cnx.commit() + + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + res = -1 + + cursor.close() + return res + + def get_date_last_record (self): + cursor = self.cnx.cursor() + op = ("SELECT UNIX_TIMESTAMP ((SELECT last_ts FROM Runs))") + try: + cursor.execute(op) + result = cursor.fetchone() + if result != None: + print ("last record: {}".format(result[0])) + return result[0] + else: + print ("Last date record is None") + return 0 + except mysql.connector.Error as err: + print("Fail {}: {}".format(cursor.statement, err)) + cursor.close() diff --git a/src/stats_consolidation/rrd_files.py b/src/stats_consolidation/rrd_files.py new file mode 100644 index 0000000..a437e0d --- /dev/null +++ b/src/stats_consolidation/rrd_files.py @@ -0,0 +1,133 @@ +import rrdtool +import os +import sys + +class RRD: + + hdr_item = 0 + ds_item = 1 + data_item = 2 + DS = {'active':0, 'buddies':0, 'instances':0, 'new':0, 'resumed':0, 'uptime':0} + + def __init__(self, path, name, date_start, date_end): + + self.rrd_name = name + + if date_start == None: + self.date_start = str(rrdtool.first(str(os.path.join (path,name)))) + else: + self.date_start = str(date_start) + + + if date_end == None: + self.date_end = str(rrdtool.last(str(os.path.join(path, name)))) + else: + self.date_end = str(date_end) + + self.user_hash = os.path.split(path)[1] + + self.user_path = os.path.join ( + self.get_first_part_path(path, 3), + "users", + "user", + self.user_hash[:2], + self.user_hash + ) + + self.uuid = self.get_uuid_from_file(self.user_path) + + + print "*******************************************" + print " RRD " + print "start: " + self.date_start + print "end: " + self.date_end + print "PATH: " + path + print "RRD NAME: " + name + print "\n" + try: + self.rrd = rrdtool.fetch (str(os.path.join(path,name)), 'AVERAGE', '-r 60', '-s '+ self.date_start, '-e '+self.date_end) + except: + raise + + print " DS " + for item in self.DS.keys(): + idx = self.get_ds_index (item) + if idx != -1: + self.DS[item] = idx + print "DS "+ item + ": " + str(self.DS[item]) + else: + print "DS "+ item + " not found in header" + print "***********************************************" + + def get_ds_index(self, ds): + i=0 + for i in range (len (self.rrd[self.ds_item])): + if self.rrd[self.ds_item][i] == ds: + return i + i=+1 + return -1 + + def get_uptime_by_interval (self): + ds_name = "uptime" + res=list() + + print "-------Calcule "+ ds_name +"-------" + i=0 + found = False + while i < len(self.rrd[self.data_item]): + value = str(self.rrd[self.data_item][i][self.DS[ds_name]]) + if value != "None": + uptime = value + end = str (long(self.date_start) + ((i+1) * 60)) + if found == False: + found = True + start = str (long (self.date_start) + ((i+1) * 60)) + else: + if found: + print start + "->" + end + ": " + uptime + if float(uptime) > 0: + res.append((start, uptime)) + found = False + i=i+1 + return res + print "---------------------------------------------------" + + + def get_name(self): + return self.rrd_name.partition(".rrd")[0] + + def show_valid_ds(self, ds_name): + print "------------------- DS "+ ds_name +"---------------------" + i=0 + while i < len(self.rrd[self.data_item]): + timestamp = str (long (self.date_start) + ((i+1) * 60)) + value = str (self.rrd[self.data_item][i][self.DS[ds_name]]) + + if value != "None": + print timestamp+ ": " + value + i=i+1 + print "---------------------------------------------------" + + + def get_date_last_record(self): + return self.date_end + + def set_user_hash(self, u_hash): + self.user_hash = u_hash + + def get_first_part_path (self, path, idx): + l=list() + l.append(path) + for i in range (idx): + l.append(os.path.split(l[i])[0]) + return l[idx] + + def get_uuid_from_file(self,path): + return open (os.path.join(path, "machine_uuid")).next() + + + def get_user_hash(self): + return self.user_hash + + def get_uuid (self): + return self.uuid diff --git a/src/stats_consolidation/test_cons.py b/src/stats_consolidation/test_cons.py new file mode 100644 index 0000000..00f43b5 --- /dev/null +++ b/src/stats_consolidation/test_cons.py @@ -0,0 +1,9 @@ +import stats_consolidation +from stats_consolidation import * + +db = DB_Stats('statistics', 'root', 'gustavo') +db.create(); + +con = Consolidation('/home/gustavo/AC/server_stats/sugar-stats/rrd', db) + +con.process_rrds() diff --git a/src/stats_consolidation/test_db.py b/src/stats_consolidation/test_db.py new file mode 100644 index 0000000..197510a --- /dev/null +++ b/src/stats_consolidation/test_db.py @@ -0,0 +1,6 @@ +import stats_consolidation +from stats_consolidation import * + +db = DB_Stats('statistics', 'root', 'gustavo') +db.create(); + diff --git a/src/stats_consolidation/test_rrd.py b/src/stats_consolidation/test_rrd.py new file mode 100644 index 0000000..657ce18 --- /dev/null +++ b/src/stats_consolidation/test_rrd.py @@ -0,0 +1,38 @@ +from db import * + +from rrd_files import * +from db import * + + +print "============================== TEST RRD -> Relational DB ========================================" +db = DB_Stats('statistics', 'root', 'gustavo') +db.create() + +DATE_START =datetime(year=2012, + month=12, + day=13, + hour=0, + minute=0, + second=0).strftime("%s") + + +DATE_END = datetime(year=2012, + month=12, + day=14, + hour=0, + minute=0, + second=0).strftime("%s") + +DATE_START = db.get_date_last_record() +DATE_END = datetime.now().strftime("%s") + +act_rrd = RRD (path = "/home/gustavo/AC/consolidation/rrds", name="pippy.rrd", date_start=DATE_START, date_end=DATE_END) +""" +act_rrd.show_valid_ds("uptime") +act_rrd.show_valid_ds("resumed") +act_rrd.show_valid_ds("new") +act_rrd.show_valid_ds("instances") +act_rrd.show_valid_ds("buddies") +""" +data = {} +db.store_activity_uptime(act_rrd) |