From 3adf03028dfb9f6e3c2c80ec8bbcdd0ae47edf6f Mon Sep 17 00:00:00 2001 From: Marius Tolzmann Date: Tue, 9 Jun 2015 12:10:18 +0200 Subject: [PATCH] mxqdump: Move database related functions to mxq_group and mxq_job --- Makefile | 2 + mxq_group.c | 285 +++++++++++++++++++++++++++ mxq_group.h | 9 + mxq_job.c | 208 ++++++++++++++++++++ mxq_job.h | 7 +- mxqdump.c | 558 ++-------------------------------------------------- 6 files changed, 521 insertions(+), 548 deletions(-) diff --git a/Makefile b/Makefile index e4b2effc..1f7568b5 100644 --- a/Makefile +++ b/Makefile @@ -356,6 +356,7 @@ mxqd: mxq_group.o mxqd: mxq_job.o mxqd: mxq_util.o mxqd: mxq_mysql.o +mxqd: mx_mysql.o mxqd: LDLIBS += $(LDLIBS_MYSQL) build: mxqd @@ -392,6 +393,7 @@ mxqdump: mxq_util.o mxqdump: mx_util.o mxqdump: mx_getopt.o mxqdump: LDLIBS += $(LDLIBS_MYSQL) +mxqdump: CFLAGS += -Wunused-function build: mxqdump diff --git a/mxq_group.c b/mxq_group.c index 3c94535c..d24ca88e 100644 --- a/mxq_group.c +++ b/mxq_group.c @@ -10,6 +10,34 @@ #include "mxq_group.h" #include "mxq_job.h" #include "mxq_mysql.h" +#include "mx_mysql.h" + +#define GROUP_FIELDS_CNT 24 +#define GROUP_FIELDS \ + " group_id," \ + " group_name," \ + " group_status," \ + " group_priority," \ + " user_uid," \ + " user_name," \ + " user_gid," \ + " user_group," \ + " job_command," \ + " job_threads," \ + " job_memory," \ + " job_time," \ + " group_jobs," \ + " group_jobs_inq," \ + " group_jobs_running," \ + " group_jobs_finished," \ + " group_jobs_failed," \ + " group_jobs_cancelled," \ + " group_jobs_unknown," \ + " group_slots_running," \ + " stats_max_maxrss," \ + " stats_max_utime_sec," \ + " stats_max_stime_sec," \ + " stats_max_real_sec" #define MXQ_GROUP_FIELDS "group_id," \ "group_name," \ @@ -62,6 +90,48 @@ enum mxq_group_columns { MXQ_GROUP_COL__END }; +static int bind_result_group_fields(struct mx_mysql_bind *result, struct mxq_group *g) +{ + int res = 0; + int idx = 0; + + res = mx_mysql_bind_init_result(result, GROUP_FIELDS_CNT); + assert(res >= 0); + + res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_id)); + res += mx_mysql_bind_var(result, idx++, string, &(g->group_name)); + res += mx_mysql_bind_var(result, idx++, uint8, &(g->group_status)); + res += mx_mysql_bind_var(result, idx++, uint16, &(g->group_priority)); + + res += mx_mysql_bind_var(result, idx++, uint32, &(g->user_uid)); + res += mx_mysql_bind_var(result, idx++, string, &(g->user_name)); + res += mx_mysql_bind_var(result, idx++, uint32, &(g->user_gid)); + res += mx_mysql_bind_var(result, idx++, string, &(g->user_group)); + + res += mx_mysql_bind_var(result, idx++, string, &(g->job_command)); + + res += mx_mysql_bind_var(result, idx++, uint16, &(g->job_threads)); + res += mx_mysql_bind_var(result, idx++, uint64, &(g->job_memory)); + res += mx_mysql_bind_var(result, idx++, uint32, &(g->job_time)); + + res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_jobs)); + res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_jobs_inq)); + res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_jobs_running)); + res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_jobs_finished)); + res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_jobs_failed)); + res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_jobs_cancelled)); + res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_jobs_unknown)); + + res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_slots_running)); + + res += mx_mysql_bind_var(result, idx++, uint32, &(g->stats_max_maxrss)); + res += mx_mysql_bind_var(result, idx++, int64, &(g->stats_max_utime.tv_sec)); + res += mx_mysql_bind_var(result, idx++, int64, &(g->stats_max_stime.tv_sec)); + res += mx_mysql_bind_var(result, idx++, int64, &(g->stats_max_real.tv_sec)); + + return res; +} + static inline int mxq_group_bind_results(MYSQL_BIND *bind, struct mxq_group *g) { memset(bind, 0, sizeof(*bind)*MXQ_GROUP_COL__END); @@ -173,7 +243,222 @@ inline uint64_t mxq_group_jobs_inq(struct mxq_group *g) return inq; } +int mxq_load_group(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t group_id) +{ + int res; + struct mxq_group *groups = NULL; + struct mxq_group g = {0}; + struct mx_mysql_bind param = {0}; + struct mx_mysql_bind result = {0}; + + assert(mysql); + assert(mxq_groups); + assert(!(*mxq_groups)); + + char *query = + "SELECT" + GROUP_FIELDS + " FROM mxq_group" + " WHERE group_id = ?" + " LIMIT 1"; + + res = mx_mysql_bind_init_param(¶m, 1); + assert(res == 0); + res = mx_mysql_bind_var(¶m, 0, uint64, &group_id); + assert(res == 0); + + res = bind_result_group_fields(&result, &g); + assert(res == 0); + + res = mx_mysql_do_statement(mysql, query, ¶m, &result, &g, (void **)&groups, sizeof(*groups)); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + + *mxq_groups = groups; + return res; +} + +int mxq_load_all_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups) +{ + int res; + struct mxq_group *groups = NULL; + struct mxq_group g = {0}; + struct mx_mysql_bind result = {0}; + + assert(mysql); + assert(mxq_groups); + assert(!(*mxq_groups)); + + char *query = + "SELECT" + GROUP_FIELDS + " FROM mxq_group" + " ORDER BY user_name, group_mtime" + " LIMIT 1000"; + + res = bind_result_group_fields(&result, &g); + assert(res == 0); + + res = mx_mysql_do_statement(mysql, query, NULL, &result, &g, (void **)&groups, sizeof(*groups)); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + + *mxq_groups = groups; + return res; +} +int mxq_load_all_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid) +{ + int res; + struct mxq_group *groups = NULL; + struct mxq_group g = {0}; + struct mx_mysql_bind param = {0}; + struct mx_mysql_bind result = {0}; + + assert(mysql); + assert(mxq_groups); + assert(!(*mxq_groups)); + + char *query = + "SELECT" + GROUP_FIELDS + " FROM mxq_group" + " WHERE user_uid = ?" + " ORDER BY user_name, group_mtime" + " LIMIT 1000"; + + res = mx_mysql_bind_init_param(¶m, 1); + assert(res == 0); + res = mx_mysql_bind_var(¶m, 0, uint64, &user_uid); + assert(res == 0); + + res = bind_result_group_fields(&result, &g); + assert(res == 0); + + res = mx_mysql_do_statement(mysql, query, ¶m, &result, &g, (void **)&groups, sizeof(*groups)); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + + *mxq_groups = groups; + return res; +} + +int mxq_load_active_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid) +{ + int res; + struct mxq_group *groups = NULL; + struct mxq_group g = {0}; + struct mx_mysql_bind result = {0}; + struct mx_mysql_bind param = {0}; + + assert(mysql); + assert(mxq_groups); + assert(!(*mxq_groups)); + + char *query = + "SELECT" + GROUP_FIELDS + " FROM mxq_group" + " WHERE ((group_jobs_inq > 0 OR group_jobs_running > 0)" + " OR (NOW()-group_mtime < 604800))" + " AND user_uid = ?" + " ORDER BY user_name, group_mtime" + " LIMIT 1000"; + + res = mx_mysql_bind_init_param(¶m, 1); + assert(res == 0); + res = mx_mysql_bind_var(¶m, 0, uint64, &user_uid); + assert(res == 0); + + res = bind_result_group_fields(&result, &g); + assert(res == 0); + + res = mx_mysql_do_statement(mysql, query, ¶m, &result, &g, (void **)&groups, sizeof(*groups)); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + + *mxq_groups = groups; + return res; +} + +int mxq_load_running_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups) +{ + int res; + struct mxq_group *groups = NULL; + struct mxq_group g = {0}; + struct mx_mysql_bind result = {0}; + + assert(mysql); + assert(mxq_groups); + assert(!(*mxq_groups)); + + char *query = + "SELECT" + GROUP_FIELDS + " FROM mxq_group" + " WHERE (group_jobs_inq > 0 OR group_jobs_running > 0)" + " ORDER BY user_name, group_mtime" + " LIMIT 1000"; + + res = bind_result_group_fields(&result, &g); + assert(res == 0); + + res = mx_mysql_do_statement(mysql, query, NULL, &result, &g, (void **)&groups, sizeof(*groups)); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + + *mxq_groups = groups; + return res; +} + +int mxq_load_running_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid) +{ + int res; + struct mxq_group *groups = NULL; + struct mxq_group g = {0}; + struct mx_mysql_bind param = {0}; + struct mx_mysql_bind result = {0}; + + assert(mysql); + assert(mxq_groups); + assert(!(*mxq_groups)); + + char *query = + "SELECT" + GROUP_FIELDS + " FROM mxq_group" + " WHERE (group_jobs_inq > 0 OR group_jobs_running > 0)" + " AND user_uid = ?" + " ORDER BY user_name, group_mtime" + " LIMIT 1000"; + + res = mx_mysql_bind_init_param(¶m, 1); + assert(res == 0); + res = mx_mysql_bind_var(¶m, 0, uint64, &user_uid); + assert(res == 0); + + res = bind_result_group_fields(&result, &g); + assert(res == 0); + + res = mx_mysql_do_statement(mysql, query, ¶m, &result, &g, (void **)&groups, sizeof(*groups)); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + + *mxq_groups = groups; + return res; +} int mxq_group_load_active_groups(MYSQL *mysql, struct mxq_group **mxq_group) { diff --git a/mxq_group.h b/mxq_group.h index 2a4b6d18..ea4b8e97 100644 --- a/mxq_group.h +++ b/mxq_group.h @@ -4,6 +4,8 @@ #include #include +#include "mx_mysql.h" + struct mxq_group { uint64_t group_id; @@ -52,6 +54,13 @@ inline uint64_t mxq_group_jobs_done(struct mxq_group *g); inline uint64_t mxq_group_jobs_active(struct mxq_group *g); inline uint64_t mxq_group_jobs_inq(struct mxq_group *g); +int mxq_load_group(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t group_id); +int mxq_load_all_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups); +int mxq_load_all_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid); +int mxq_load_active_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid); +int mxq_load_running_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups); +int mxq_load_running_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid); + int mxq_group_load_active_groups(MYSQL *mysql, struct mxq_group **mxq_group); void mxq_group_free_content(struct mxq_group *g); int mxq_group_update_status_cancelled(MYSQL *mysql, struct mxq_group *group); diff --git a/mxq_job.c b/mxq_job.c index f617c29f..428a7f24 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -17,6 +17,49 @@ #include "mxq_job.h" #include "mxq_mysql.h" +#define JOB_FIELDS_CNT 34 +#define JOB_FIELDS \ + " job_id, " \ + " job_status, " \ + " job_flags, " \ + " job_priority, " \ + " group_id, " \ + \ + " job_workdir, " \ + " job_argc, " \ + " job_argv, " \ + " job_stdout, " \ + " job_stderr, " \ + \ + " job_umask, " \ + " host_submit, " \ + " server_id, " \ + " host_hostname, " \ + " host_pid, " \ + \ + " host_slots, " \ + " UNIX_TIMESTAMP(date_submit) as date_submit, " \ + " UNIX_TIMESTAMP(date_start) as date_start, " \ + " UNIX_TIMESTAMP(date_end) as date_end, " \ + " stats_status, " \ + \ + " stats_utime_sec, " \ + " stats_utime_usec, " \ + " stats_stime_sec, " \ + " stats_stime_usec, " \ + " stats_real_sec, " \ + \ + " stats_real_usec, " \ + " stats_maxrss, " \ + " stats_minflt, " \ + " stats_majflt, " \ + " stats_nswap, " \ + \ + " stats_inblock, " \ + " stats_oublock, " \ + " stats_nvcsw, " \ + " stats_nivcsw" + #define MXQ_JOB_FIELDS "job_id, " \ "job_status, " \ "job_flags, " \ @@ -90,6 +133,58 @@ enum mxq_job_columns { MXQ_JOB_COL__END }; +static int bind_result_job_fields(struct mx_mysql_bind *result, struct mxq_job *j) +{ + int res = 0; + int idx = 0; + + res = mx_mysql_bind_init_result(result, JOB_FIELDS_CNT); + assert(res >= 0); + + res += mx_mysql_bind_var(result, idx++, uint64, &(j->job_id)); + res += mx_mysql_bind_var(result, idx++, uint16, &(j->job_status)); + res += mx_mysql_bind_var(result, idx++, uint64, &(j->job_flags)); + res += mx_mysql_bind_var(result, idx++, uint16, &(j->job_priority)); + res += mx_mysql_bind_var(result, idx++, uint64, &(j->group_id)); + + res += mx_mysql_bind_var(result, idx++, string, &(j->job_workdir)); + res += mx_mysql_bind_var(result, idx++, uint16, &(j->job_argc)); + res += mx_mysql_bind_var(result, idx++, string, &(j->job_argv_str)); + res += mx_mysql_bind_var(result, idx++, string, &(j->job_stdout)); + res += mx_mysql_bind_var(result, idx++, string, &(j->job_stderr)); + + res += mx_mysql_bind_var(result, idx++, uint32, &(j->job_umask)); + res += mx_mysql_bind_var(result, idx++, string, &(j->host_submit)); + res += mx_mysql_bind_var(result, idx++, string, &(j->server_id)); + res += mx_mysql_bind_var(result, idx++, string, &(j->host_hostname)); + res += mx_mysql_bind_var(result, idx++, uint32, &(j->host_pid)); + + res += mx_mysql_bind_var(result, idx++, uint32, &(j->host_slots)); + res += mx_mysql_bind_var(result, idx++, int64, &(j->date_submit)); + res += mx_mysql_bind_var(result, idx++, int64, &(j->date_start)); + res += mx_mysql_bind_var(result, idx++, int64, &(j->date_end)); + res += mx_mysql_bind_var(result, idx++, int32, &(j->stats_status)); + + res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_utime.tv_sec)); + res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_utime.tv_usec)); + res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_stime.tv_sec)); + res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_stime.tv_usec)); + res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_realtime.tv_sec)); + + res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_realtime.tv_usec)); + res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_maxrss)); + res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_minflt)); + res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_majflt)); + res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_nswap)); + + res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_inblock)); + res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_oublock)); + res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_nvcsw)); + res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_nivcsw)); + + return res; +} + char *mxq_job_status_to_name(uint64_t status) { switch (status) { @@ -264,6 +359,119 @@ void mxq_job_free_content(struct mxq_job *j) j->job_argv = NULL; } +int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job_id) +{ + int res; + struct mxq_job *jobs = NULL; + struct mxq_job j = {0}; + struct mx_mysql_bind param = {0}; + struct mx_mysql_bind result = {0}; + + assert(mysql); + assert(mxq_jobs); + assert(!(*mxq_jobs)); + + char *query = + "SELECT" + JOB_FIELDS + " FROM mxq_job" + " WHERE job_id = ?" + " LIMIT 1"; + + res = mx_mysql_bind_init_param(¶m, 1); + assert(res == 0); + res = mx_mysql_bind_var(¶m, 0, uint64, &job_id); + assert(res == 0); + + res = bind_result_job_fields(&result, &j); + assert(res == 0); + + res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + + *mxq_jobs = jobs; + return res; +} + +int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp) +{ + int res; + struct mxq_job *jobs = NULL; + struct mxq_job j = {0}; + struct mx_mysql_bind param = {0}; + struct mx_mysql_bind result = {0}; + + assert(mysql); + assert(mxq_jobs); + assert(!(*mxq_jobs)); + + char *query = + "SELECT" + JOB_FIELDS + " FROM mxq_job" + " WHERE group_id = ? OR 1 = 0" + " ORDER BY server_id, host_hostname, job_id"; + + res = mx_mysql_bind_init_param(¶m, 1); + assert(res == 0); + res = mx_mysql_bind_var(¶m, 0, uint64, &(grp->group_id)); + assert(res == 0); + + res = bind_result_job_fields(&result, &j); + assert(res == 0); + + res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + + *mxq_jobs = jobs; + return res; +} + +int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp, uint64_t job_status) +{ + int res; + struct mxq_job *jobs = NULL; + struct mxq_job j = {0}; + struct mx_mysql_bind param = {0}; + struct mx_mysql_bind result = {0}; + + assert(mysql); + assert(mxq_jobs); + assert(!(*mxq_jobs)); + + char *query = + "SELECT" + JOB_FIELDS + " FROM mxq_job" + " WHERE group_id = ?" + " AND job_status = ?" + " ORDER BY server_id, host_hostname, job_id"; + + res = mx_mysql_bind_init_param(¶m, 2); + assert(res == 0); + res = mx_mysql_bind_var(¶m, 0, uint64, &(grp->group_id)); + res = mx_mysql_bind_var(¶m, 1, uint64, &job_status); + assert(res == 0); + + res = bind_result_job_fields(&result, &j); + assert(res == 0); + + res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + + *mxq_jobs = jobs; + return res; +} + int mxq_job_update_status_assigned(MYSQL *mysql, struct mxq_job *job) { char *query; diff --git a/mxq_job.h b/mxq_job.h index a8e23b1a..674da681 100644 --- a/mxq_job.h +++ b/mxq_job.h @@ -60,7 +60,6 @@ struct mxq_job { struct rusage stats_rusage; }; - #define MXQ_JOB_STATUS_INQ 0 #define MXQ_JOB_STATUS_ASSIGNED 100 #define MXQ_JOB_STATUS_LOADED 150 @@ -83,13 +82,15 @@ struct mxq_job { #define MXQ_JOB_FLAGS_AUTORESTART (1<<62) #define MXQ_JOB_FLAGS_HOSTFAIL (1<<63) - #define _to_string(s) #s #define status_str(x) _to_string(x) - char *mxq_job_status_to_name(uint64_t status); +int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job_id); +int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp); +int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp, uint64_t job_status); + int mxq_job_load_assigned(MYSQL *mysql, struct mxq_job *job, char *hostname, char *server_id); void mxq_job_free_content(struct mxq_job *j); int mxq_job_load(MYSQL *mysql, struct mxq_job *mxqjob, uint64_t group_id, char *hostname, char *server_id); diff --git a/mxqdump.c b/mxqdump.c index 84978ff2..9c98efdb 100644 --- a/mxqdump.c +++ b/mxqdump.c @@ -30,73 +30,6 @@ #define UINT64_SPECIAL_MIN (uint64_t)(-2) #define UINT64_HASVALUE(x) ((x) < UINT64_SPECIAL_MIN) -#define GROUP_FIELDS \ - " group_id," \ - " group_name," \ - " group_status," \ - " group_priority," \ - " user_uid," \ - " user_name," \ - " user_gid," \ - " user_group," \ - " job_command," \ - " job_threads," \ - " job_memory," \ - " job_time," \ - " group_jobs," \ - " group_jobs_inq," \ - " group_jobs_running," \ - " group_jobs_finished," \ - " group_jobs_failed," \ - " group_jobs_cancelled," \ - " group_jobs_unknown," \ - " group_slots_running," \ - " stats_max_maxrss," \ - " stats_max_utime_sec," \ - " stats_max_stime_sec," \ - " stats_max_real_sec" - -#define JOB_FIELDS \ - " job_id, " \ - " job_status, " \ - " job_flags, " \ - " job_priority, " \ - " group_id, " \ - \ - " job_workdir, " \ - " job_argc, " \ - " job_argv, " \ - " job_stdout, " \ - " job_stderr, " \ - \ - " job_umask, " \ - " host_submit, " \ - " server_id, " \ - " host_hostname, " \ - " host_pid, " \ - \ - " host_slots, " \ - " UNIX_TIMESTAMP(date_submit) as date_submit, " \ - " UNIX_TIMESTAMP(date_start) as date_start, " \ - " UNIX_TIMESTAMP(date_end) as date_end, " \ - " stats_status, " \ - \ - " stats_utime_sec, " \ - " stats_utime_usec, " \ - " stats_stime_sec, " \ - " stats_stime_usec, " \ - " stats_real_sec, " \ - \ - " stats_real_usec, " \ - " stats_maxrss, " \ - " stats_minflt, " \ - " stats_majflt, " \ - " stats_nswap, " \ - \ - " stats_inblock, " \ - " stats_oublock, " \ - " stats_nvcsw, " \ - " stats_nivcsw" static void print_usage(void) { @@ -172,469 +105,6 @@ static void print_usage(void) ); } -static int bind_result_job_fields(struct mx_mysql_bind *result, struct mxq_job *j) -{ - int res = 0; - int idx = 0; - - res = mx_mysql_bind_init_result(result, 34); - assert(res >= 0); - - res += mx_mysql_bind_var(result, idx++, uint64, &(j->job_id)); - res += mx_mysql_bind_var(result, idx++, uint16, &(j->job_status)); - res += mx_mysql_bind_var(result, idx++, uint64, &(j->job_flags)); - res += mx_mysql_bind_var(result, idx++, uint16, &(j->job_priority)); - res += mx_mysql_bind_var(result, idx++, uint64, &(j->group_id)); - - res += mx_mysql_bind_var(result, idx++, string, &(j->job_workdir)); - res += mx_mysql_bind_var(result, idx++, uint16, &(j->job_argc)); - res += mx_mysql_bind_var(result, idx++, string, &(j->job_argv_str)); - res += mx_mysql_bind_var(result, idx++, string, &(j->job_stdout)); - res += mx_mysql_bind_var(result, idx++, string, &(j->job_stderr)); - - res += mx_mysql_bind_var(result, idx++, uint32, &(j->job_umask)); - res += mx_mysql_bind_var(result, idx++, string, &(j->host_submit)); - res += mx_mysql_bind_var(result, idx++, string, &(j->server_id)); - res += mx_mysql_bind_var(result, idx++, string, &(j->host_hostname)); - res += mx_mysql_bind_var(result, idx++, uint32, &(j->host_pid)); - - res += mx_mysql_bind_var(result, idx++, uint32, &(j->host_slots)); - res += mx_mysql_bind_var(result, idx++, int64, &(j->date_submit)); - res += mx_mysql_bind_var(result, idx++, int64, &(j->date_start)); - res += mx_mysql_bind_var(result, idx++, int64, &(j->date_end)); - res += mx_mysql_bind_var(result, idx++, int32, &(j->stats_status)); - - res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_utime.tv_sec)); - res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_utime.tv_usec)); - res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_stime.tv_sec)); - res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_stime.tv_usec)); - res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_realtime.tv_sec)); - - res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_realtime.tv_usec)); - res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_maxrss)); - res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_minflt)); - res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_majflt)); - res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_nswap)); - - res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_inblock)); - res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_oublock)); - res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_nvcsw)); - res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_nivcsw)); - - return res; -} - -static int bind_result_group_fields(struct mx_mysql_bind *result, struct mxq_group *g) -{ - int res = 0; - int idx = 0; - - res = mx_mysql_bind_init_result(result, 24); - assert(res >= 0); - - res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_id)); - res += mx_mysql_bind_var(result, idx++, string, &(g->group_name)); - res += mx_mysql_bind_var(result, idx++, uint8, &(g->group_status)); - res += mx_mysql_bind_var(result, idx++, uint16, &(g->group_priority)); - - res += mx_mysql_bind_var(result, idx++, uint32, &(g->user_uid)); - res += mx_mysql_bind_var(result, idx++, string, &(g->user_name)); - res += mx_mysql_bind_var(result, idx++, uint32, &(g->user_gid)); - res += mx_mysql_bind_var(result, idx++, string, &(g->user_group)); - - res += mx_mysql_bind_var(result, idx++, string, &(g->job_command)); - - res += mx_mysql_bind_var(result, idx++, uint16, &(g->job_threads)); - res += mx_mysql_bind_var(result, idx++, uint64, &(g->job_memory)); - res += mx_mysql_bind_var(result, idx++, uint32, &(g->job_time)); - - res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_jobs)); - res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_jobs_inq)); - res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_jobs_running)); - res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_jobs_finished)); - res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_jobs_failed)); - res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_jobs_cancelled)); - res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_jobs_unknown)); - - res += mx_mysql_bind_var(result, idx++, uint64, &(g->group_slots_running)); - - res += mx_mysql_bind_var(result, idx++, uint32, &(g->stats_max_maxrss)); - res += mx_mysql_bind_var(result, idx++, int64, &(g->stats_max_utime.tv_sec)); - res += mx_mysql_bind_var(result, idx++, int64, &(g->stats_max_stime.tv_sec)); - res += mx_mysql_bind_var(result, idx++, int64, &(g->stats_max_real.tv_sec)); - - return res; -} - -static int load_active_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid) -{ - int res; - struct mxq_group *groups = NULL; - struct mxq_group g = {0}; - struct mx_mysql_bind result = {0}; - struct mx_mysql_bind param = {0}; - - assert(mysql); - assert(mxq_groups); - assert(!(*mxq_groups)); - - char *query = - "SELECT" - GROUP_FIELDS - " FROM mxq_group" - " WHERE ((group_jobs_inq > 0 OR group_jobs_running > 0)" - " OR (NOW()-group_mtime < 604800))" - " AND user_uid = ?" - " ORDER BY user_name, group_mtime" - " LIMIT 1000"; - - res = mx_mysql_bind_init_param(¶m, 1); - assert(res == 0); - res = mx_mysql_bind_var(¶m, 0, uint64, &user_uid); - assert(res == 0); - - res = bind_result_group_fields(&result, &g); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &g, (void **)&groups, sizeof(*groups)); - if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); - return res; - } - - *mxq_groups = groups; - return res; -} - -static int load_running_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups) -{ - int res; - struct mxq_group *groups = NULL; - struct mxq_group g = {0}; - struct mx_mysql_bind result = {0}; - - assert(mysql); - assert(mxq_groups); - assert(!(*mxq_groups)); - - char *query = - "SELECT" - GROUP_FIELDS - " FROM mxq_group" - " WHERE (group_jobs_inq > 0 OR group_jobs_running > 0)" - " ORDER BY user_name, group_mtime" - " LIMIT 1000"; - - res = bind_result_group_fields(&result, &g); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, NULL, &result, &g, (void **)&groups, sizeof(*groups)); - if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); - return res; - } - - *mxq_groups = groups; - return res; -} - -static int load_running_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid) -{ - int res; - struct mxq_group *groups = NULL; - struct mxq_group g = {0}; - struct mx_mysql_bind param = {0}; - struct mx_mysql_bind result = {0}; - - assert(mysql); - assert(mxq_groups); - assert(!(*mxq_groups)); - - char *query = - "SELECT" - GROUP_FIELDS - " FROM mxq_group" - " WHERE (group_jobs_inq > 0 OR group_jobs_running > 0)" - " AND user_uid = ?" - " ORDER BY user_name, group_mtime" - " LIMIT 1000"; - - res = mx_mysql_bind_init_param(¶m, 1); - assert(res == 0); - res = mx_mysql_bind_var(¶m, 0, uint64, &user_uid); - assert(res == 0); - - res = bind_result_group_fields(&result, &g); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &g, (void **)&groups, sizeof(*groups)); - if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); - return res; - } - - *mxq_groups = groups; - return res; -} - -static int load_all_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid) -{ - int res; - struct mxq_group *groups = NULL; - struct mxq_group g = {0}; - struct mx_mysql_bind param = {0}; - struct mx_mysql_bind result = {0}; - - assert(mysql); - assert(mxq_groups); - assert(!(*mxq_groups)); - - char *query = - "SELECT" - GROUP_FIELDS - " FROM mxq_group" - " WHERE user_uid = ?" - " ORDER BY user_name, group_mtime" - " LIMIT 1000"; - - res = mx_mysql_bind_init_param(¶m, 1); - assert(res == 0); - res = mx_mysql_bind_var(¶m, 0, uint64, &user_uid); - assert(res == 0); - - res = bind_result_group_fields(&result, &g); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &g, (void **)&groups, sizeof(*groups)); - if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); - return res; - } - - *mxq_groups = groups; - return res; -} - -static int load_all_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups) -{ - int res; - struct mxq_group *groups = NULL; - struct mxq_group g = {0}; - struct mx_mysql_bind result = {0}; - - assert(mysql); - assert(mxq_groups); - assert(!(*mxq_groups)); - - char *query = - "SELECT" - GROUP_FIELDS - " FROM mxq_group" - " ORDER BY user_name, group_mtime" - " LIMIT 1000"; - - res = bind_result_group_fields(&result, &g); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, NULL, &result, &g, (void **)&groups, sizeof(*groups)); - if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); - return res; - } - - *mxq_groups = groups; - return res; -} - -static int load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job_id) -{ - int res; - struct mxq_job *jobs = NULL; - struct mxq_job j = {0}; - struct mx_mysql_bind param = {0}; - struct mx_mysql_bind result = {0}; - - assert(mysql); - assert(mxq_jobs); - assert(!(*mxq_jobs)); - - char *query = - "SELECT" - JOB_FIELDS - " FROM mxq_job" - " WHERE job_id = ?" - " LIMIT 1"; - - res = mx_mysql_bind_init_param(¶m, 1); - assert(res == 0); - res = mx_mysql_bind_var(¶m, 0, uint64, &job_id); - assert(res == 0); - - res = bind_result_job_fields(&result, &j); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); - if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); - return res; - } - - *mxq_jobs = jobs; - return res; -} - -static int load_jobs(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp) -{ - int res; - struct mxq_job *jobs = NULL; - struct mxq_job j = {0}; - struct mx_mysql_bind param = {0}; - struct mx_mysql_bind result = {0}; - - assert(mysql); - assert(mxq_jobs); - assert(!(*mxq_jobs)); - - char *query = - "SELECT" - JOB_FIELDS - " FROM mxq_job" - " WHERE group_id = ? OR 1 = 0" - " ORDER BY server_id, host_hostname, job_id"; - - res = mx_mysql_bind_init_param(¶m, 1); - assert(res == 0); - res = mx_mysql_bind_var(¶m, 0, uint64, &(grp->group_id)); - assert(res == 0); - - res = bind_result_job_fields(&result, &j); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); - if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); - return res; - } - - *mxq_jobs = jobs; - return res; -} - -static int load_jobs_with_status(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp, uint64_t job_status) -{ - int res; - struct mxq_job *jobs = NULL; - struct mxq_job j = {0}; - struct mx_mysql_bind param = {0}; - struct mx_mysql_bind result = {0}; - - assert(mysql); - assert(mxq_jobs); - assert(!(*mxq_jobs)); - - char *query = - "SELECT" - JOB_FIELDS - " FROM mxq_job" - " WHERE group_id = ?" - " AND job_status = ?" - " ORDER BY server_id, host_hostname, job_id"; - - res = mx_mysql_bind_init_param(¶m, 2); - assert(res == 0); - res = mx_mysql_bind_var(¶m, 0, uint64, &(grp->group_id)); - res = mx_mysql_bind_var(¶m, 1, uint64, &job_status); - assert(res == 0); - - res = bind_result_job_fields(&result, &j); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); - if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); - return res; - } - - *mxq_jobs = jobs; - return res; -} - -static int load_running_jobs_by_group_id(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t group_id) -{ - int res; - struct mxq_job *jobs = NULL; - struct mxq_job j = {0}; - struct mx_mysql_bind param = {0}; - struct mx_mysql_bind result = {0}; - - assert(mysql); - assert(mxq_jobs); - assert(!(*mxq_jobs)); - - char *query = - "SELECT" - JOB_FIELDS - " FROM mxq_job" - " WHERE group_id = ?" - " AND job_status = 200" - " ORDER BY server_id, host_hostname, job_id"; - - res = mx_mysql_bind_init_param(¶m, 1); - assert(res == 0); - res = mx_mysql_bind_var(¶m, 0, uint64, &group_id); - assert(res == 0); - - res = bind_result_job_fields(&result, &j); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); - if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); - return res; - } - - *mxq_jobs = jobs; - return res; -} - - -static int load_group(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t group_id) -{ - int res; - struct mxq_group *groups = NULL; - struct mxq_group g = {0}; - struct mx_mysql_bind param = {0}; - struct mx_mysql_bind result = {0}; - - assert(mysql); - assert(mxq_groups); - assert(!(*mxq_groups)); - - char *query = - "SELECT" - GROUP_FIELDS - " FROM mxq_group" - " WHERE group_id = ?" - " LIMIT 1"; - - res = mx_mysql_bind_init_param(¶m, 1); - assert(res == 0); - res = mx_mysql_bind_var(¶m, 0, uint64, &group_id); - assert(res == 0); - - res = bind_result_group_fields(&result, &g); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &g, (void **)&groups, sizeof(*groups)); - if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); - return res; - } - - *mxq_groups = groups; - return res; -} - static int print_group(struct mxq_group *g) { return printf("user=%s uid=%u group_id=%lu pri=%d jobs_total=%lu run_jobs=%lu run_slots=%lu failed=%lu" @@ -717,7 +187,6 @@ static int print_job(struct mxq_group *g, struct mxq_job *j) j->job_argv_str); } - static int dump_group(struct mx_mysql *mysql, uint64_t group_id) { struct mxq_group *grp, *groups = NULL; @@ -730,7 +199,7 @@ static int dump_group(struct mx_mysql *mysql, uint64_t group_id) assert(mysql); assert(UINT64_HASVALUE(group_id)); - grp_cnt = load_group(mysql, &groups, group_id); + grp_cnt = mxq_load_group(mysql, &groups, group_id); if (!grp_cnt) return 0; @@ -757,17 +226,17 @@ static int dump_groups(struct mx_mysql *mysql, uint64_t status, uint64_t user_ui if (status == MXQ_JOB_STATUS_RUNNING) { if (UINT64_HASVALUE(user_uid)) { - grp_cnt = load_running_groups_for_user(mysql, &groups, user_uid); + grp_cnt = mxq_load_running_groups_for_user(mysql, &groups, user_uid); } else { assert(user_uid == UINT64_ALL); - grp_cnt = load_running_groups(mysql, &groups); + grp_cnt = mxq_load_running_groups(mysql, &groups); } } else if (status == UINT64_ALL && user_uid == UINT64_ALL) { - grp_cnt = load_all_groups(mysql, &groups); + grp_cnt = mxq_load_all_groups(mysql, &groups); } else if (status == UINT64_ALL && UINT64_HASVALUE(user_uid)) { - grp_cnt = load_all_groups_for_user(mysql, &groups, user_uid); + grp_cnt = mxq_load_all_groups_for_user(mysql, &groups, user_uid); } else { - grp_cnt = load_active_groups_for_user(mysql, &groups, user_uid); + grp_cnt = mxq_load_active_groups_for_user(mysql, &groups, user_uid); } for (g = 0; g < grp_cnt; g++) { @@ -797,14 +266,14 @@ static int dump_job(struct mx_mysql *mysql, uint64_t job_id) assert(mysql); assert(UINT64_HASVALUE(job_id)); - job_cnt = load_job(mysql, &jobs, job_id); + job_cnt = mxq_load_job(mysql, &jobs, job_id); if (!job_cnt) { return 0; } job = &jobs[0]; - grp_cnt = load_group(mysql, &groups, job->group_id); + grp_cnt = mxq_load_group(mysql, &groups, job->group_id); if (!grp_cnt) { mx_log_err("can'load group with group_id='%lu' for job with job_id='%lu'", job->group_id, job_id); mxq_job_free_content(job); @@ -842,15 +311,15 @@ static int dump_jobs(struct mx_mysql *mysql, uint64_t group_id, uint64_t job_sta if (UINT64_HASVALUE(group_id)) { assert(user_uid == UINT64_ALL); - grp_cnt = load_group(mysql, &groups, group_id); + grp_cnt = mxq_load_group(mysql, &groups, group_id); } else { assert(group_id == UINT64_ALL); assert(job_status == MXQ_JOB_STATUS_RUNNING); if (UINT64_HASVALUE(user_uid)) - grp_cnt = load_running_groups_for_user(mysql, &groups, user_uid); + grp_cnt = mxq_load_running_groups_for_user(mysql, &groups, user_uid); else - grp_cnt = load_running_groups(mysql, &groups); + grp_cnt = mxq_load_running_groups(mysql, &groups); } mx_debug_value("%lu", grp_cnt); @@ -859,9 +328,9 @@ static int dump_jobs(struct mx_mysql *mysql, uint64_t group_id, uint64_t job_sta grp = &groups[g]; if (UINT64_HASVALUE(job_status)) - job_cnt = load_jobs_with_status(mysql, &jobs, grp, job_status); + job_cnt = mxq_load_jobs_in_group_with_status(mysql, &jobs, grp, job_status); else - job_cnt = load_jobs(mysql, &jobs, grp); + job_cnt = mxq_load_jobs_in_group(mysql, &jobs, grp); mx_debug_value("%lu", job_cnt); @@ -1281,4 +750,3 @@ int main(int argc, char *argv[]) mx_log_info("MySQL: Connection to database closed."); return 0; }; -