diff --git a/.gitignore b/.gitignore index 14d2c8c..d9ac12b 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,6 @@ mx_util.o mxq_group.o mxqdump.o mxq_job.o -mxq_mysql.o mxqd.o mxqsub.o mxqkill.o diff --git a/Makefile b/Makefile index e0dc2b6..e8d5347 100644 --- a/Makefile +++ b/Makefile @@ -84,11 +84,10 @@ LDLIBS_MYSQL := $(shell $(MYSQL_CONFIG) --libs) CFLAGS_MYSQL += ${CFLAGS_MXQ_MYSQL_DEFAULT_FILE} CFLAGS_MYSQL += ${CFLAGS_MXQ_MYSQL_DEFAULT_GROUP} +CFLAGS_MYSQL += -DMX_MYSQL_FAIL_WAIT_DEFAULT=5 CFLAGS += -g CFLAGS += -Wall -CFLAGS += -Wno-unused-variable -CFLAGS += -Wno-unused-function CFLAGS += -DMXQ_VERSION=\"${MXQ_VERSION}\" CFLAGS += -DMXQ_VERSIONFULL=\"${MXQ_VERSIONFULL}\" CFLAGS += -DMXQ_VERSIONDATE=\"${MXQ_VERSIONDATE}\" @@ -185,11 +184,6 @@ mx_mysql.h += $(mx_util.h) mx_mxq.h += mx_mxq.h -### mxq_mysql.h -------------------------------------------------------- - -mxq_mysql.h += mxq_mysql.h -mxq_mysql.h += $(mxq_util.h) - ### mxq_util.h --------------------------------------------------------- mxq_util.h += mxq_util.h @@ -253,20 +247,11 @@ mxq_log.o: $(mx_log.h) clean: CLEAN += mxq_log.o -### mxq_mysql.o -------------------------------------------------------- - -mxq_mysql.o: $(mx_log.h) -mxq_mysql.o: $(mxq_mysql.h) -mxq_mysql.o: $(mxq_util.h) -mxq_mysql.o: CFLAGS += $(CFLAGS_MYSQL) - -clean: CLEAN += mxq_mysql.o - ### mxqdump.o --------------------------------------------------- mxqdump.o: $(mx_log.h) mxqdump.o: $(mxq_util.h) -mxqdump.o: $(mxq_mysql.h) +mxqdump.o: $(mx_mysql.h) mxqdump.o: $(mx_getopt.h) mxqdump.o: CFLAGS += $(CFLAGS_MYSQL) @@ -290,6 +275,7 @@ clean: CLEAN += mxqkill.o mxq_util.o: $(mx_log.h) mxq_util.o: $(mxq_util.h) mxq_util.o: CFLAGS += $(CFLAGS_MYSQL) +mxq_util.o: CFLAGS += -Wno-unused-variable clean: CLEAN += mxq_util.o @@ -297,7 +283,7 @@ clean: CLEAN += mxq_util.o mxq_group.o: $(mx_log.h) mxq_group.o: $(mxq_group.h) -mxq_group.o: $(mxq_mysql.h) +mxq_group.o: $(mx_mysql.h) mxq_group.o: CFLAGS += $(CFLAGS_MYSQL) clean: CLEAN += mxq_group.o @@ -308,7 +294,7 @@ mxq_job.o: $(mx_util.h) mxq_job.o: $(mx_log.h) mxq_job.o: $(mxq_job.h) mxq_job.o: $(mxq_group.h) -mxq_job.o: $(mxq_mysql.h) +mxq_job.o: $(mx_mysql.h) mxq_job.o: CFLAGS += $(CFLAGS_MYSQL) clean: CLEAN += mxq_job.o @@ -322,7 +308,7 @@ mxqd.o: $(mx_log.h) mxqd.o: $(mxqd.h) mxqd.o: $(mxq_group.h) mxqd.o: $(mxq_job.h) -mxqd.o: $(mxq_mysql.h) +mxqd.o: $(mx_mysql.h) mxqd.o: CFLAGS += $(CFLAGS_MYSQL) mxqd.o: CFLAGS += $(CFLAGS_MXQ_INITIAL_PATH) mxqd.o: CFLAGS += -Wno-unused-but-set-variable @@ -355,7 +341,6 @@ mxqd: mx_getopt.o mxqd: mxq_group.o mxqd: mxq_job.o mxqd: mxq_util.o -mxqd: mxq_mysql.o mxqd: mx_mysql.o mxqd: LDLIBS += $(LDLIBS_MYSQL) @@ -388,7 +373,6 @@ mxqdump: mx_log.o mxqdump: mx_mysql.o mxqdump: mxq_group.o mxqdump: mxq_job.o -mxqdump: mxq_mysql.o mxqdump: mxq_util.o mxqdump: mx_util.o mxqdump: mx_getopt.o diff --git a/mx_mysql.c b/mx_mysql.c index 12e0766..0ef434b 100644 --- a/mx_mysql.c +++ b/mx_mysql.c @@ -36,11 +36,13 @@ #define mx__mysql_log_emerg(mysql) mx__mysql_log(emerg, (mysql)) #define mx__mysql_log_err(mysql) mx__mysql_log(err, (mysql)) +#define mx__mysql_log_warning(mysql) mx__mysql_log(warning, (mysql)) #define mx__mysql_log_info(mysql) mx__mysql_log(info, (mysql)) #define mx__mysql_log_debug(mysql) mx__mysql_log(debug, (mysql)) #define mx__mysql_stmt_log_emerg(stmt) mx__mysql_stmt_log(emerg, (stmt)) #define mx__mysql_stmt_log_err(stmt) mx__mysql_stmt_log(err, (stmt)) +#define mx__mysql_stmt_log_warning(stmt) mx__mysql_stmt_log(warning, (stmt)) #define mx__mysql_stmt_log_info(stmt) mx__mysql_stmt_log(info, (stmt)) #define mx__mysql_stmt_log_debug(stmt) mx__mysql_stmt_log(debug, (stmt)) @@ -206,6 +208,16 @@ static inline int mx__mysql_ping(struct mx_mysql *mysql) mx__mysql_log_emerg(mysql); return -(errno=EPROTO); + case CR_OUT_OF_MEMORY: + return -(errno=ENOMEM); + + case CR_CONN_HOST_ERROR: + case CR_CONNECTION_ERROR: + case CR_IPSOCK_ERROR: + case CR_SOCKET_CREATE_ERROR: + case CR_UNKNOWN_HOST: + case CR_VERSION_ERROR: + case CR_SERVER_LOST: case CR_SERVER_GONE_ERROR: return -(errno=EAGAIN); @@ -699,6 +711,42 @@ static inline int _mx_mysql_bind_integer(struct mx_mysql_bind *b, unsigned int i return 0; } +void _mx_mysql_bind_dump_index(struct mx_mysql_bind *b, unsigned int index) +{ + mx_debug_value("%d", index); + mx_debug_value("%d", b->bind[index].buffer_type); + mx_debug_value("%d", b->bind[index].buffer_length); + mx_debug_value("%p", b->bind[index].buffer); + if (b->bind[index].buffer_type == MYSQL_TYPE_STRING) + mx_debug_value("%s", b->bind[index].buffer); + mx_debug_value("%d", b->bind[index].is_unsigned); + mx_debug_value("%d", *b->bind[index].length); + mx_debug_value("%d", *b->bind[index].is_null); + mx_debug_value("%d", *b->bind[index].error); + mx_debug_value("0x%x", b->data[index].flags); +} + +void _mx_mysql_bind_dump(struct mx_mysql_bind *b) +{ + int i; + + mx_log_debug("entered"); + + if (!b) { + mx_log_debug("done"); + return; + } + + mx_debug_value("%d", b->type); + mx_debug_value("%d", b->count); + + for (i=0; i < b->count; i++) { + _mx_mysql_bind_dump_index(b, i); + } + mx_log_debug("done"); +} + + static inline int _mx_mysql_bind_string(struct mx_mysql_bind *b, unsigned int index, char **value) { mx_assert_return_minus_errno(b, EINVAL); @@ -755,31 +803,39 @@ static inline int _mx_mysql_bind_validate(struct mx_mysql_bind *b) /**********************************************************************/ -int mx_mysql_init(struct mx_mysql **mysql) +int mx_mysql_initialize(struct mx_mysql **mysql) { struct mx_mysql *m; - int res; mx_assert_return_minus_errno(mysql, EINVAL); mx_assert_return_minus_errno(!(*mysql), EUCLEAN); m = mx_calloc_forever(1, sizeof(*m)); + *mysql = m; + + return 0; +} + +int mx_mysql_init(struct mx_mysql *mysql) +{ + int res; + + mx_assert_return_minus_errno(mysql, EINVAL); + do { - res = mx__mysql_init(m); + res = mx__mysql_init(mysql); if (res == 0) break; if (res != -ENOMEM) return res; - mx_log_debug("mx__mysql_init() failed: %m - retrying (forever) in %d second(s).", MX_CALLOC_FAIL_WAIT_DEFAULT); - mx_sleep(MX_CALLOC_FAIL_WAIT_DEFAULT); + mx_log_debug("mx__mysql_init() failed: %m - retrying (forever) in %d second(s).", MX_MYSQL_FAIL_WAIT_DEFAULT); + mx_sleep(MX_MYSQL_FAIL_WAIT_DEFAULT); } while (1); - *mysql = m; - return 0; } @@ -883,7 +939,12 @@ int mx_mysql_connect(struct mx_mysql **mysql) mx_assert_return_minus_errno(mysql, EINVAL); if (!(*mysql)) { - res = mx_mysql_init(mysql); + res = mx_mysql_initialize(mysql); + if (res < 0) + return res; + } + if (!(*mysql)->mysql) { + res = mx_mysql_init(*mysql); if (res < 0) return res; } @@ -897,8 +958,9 @@ int mx_mysql_connect_forever_sec(struct mx_mysql **mysql, unsigned int seconds) int res; while ((res = mx_mysql_connect(mysql)) < 0) { - mx__mysql_log_info(*mysql); + mx__mysql_log_warning(*mysql); mx_mysql_assert_usage_ok(res); + mx_log_warning("mx_mysql_connect() failed: %m - retrying (forever) in %d second(s).", seconds); mx_sleep(seconds); } @@ -955,6 +1017,33 @@ int mx_mysql_ping(struct mx_mysql *mysql) return mx__mysql_ping(mysql); } +int mx_mysql_ping_forever(struct mx_mysql *mysql) +{ + int res; + int fail = 0; + + mx_assert_return_minus_errno(mysql, EINVAL); + + while (1) { + res = mx_mysql_ping(mysql); + if (res == 0) + break; + + fail++; + + mx__mysql_log_warning(mysql); + mx_mysql_assert_usage_ok(res); + mx_log_warning("mx_mysql_ping() failed: %m - retrying again (forever) in %d second(s).", MX_MYSQL_FAIL_WAIT_DEFAULT); + mx_sleep(MX_MYSQL_FAIL_WAIT_DEFAULT); + } + + if (fail) + mx_log_info("mx_mysql_ping_forever() recovered from previous errors (%d tries). Yippieh! Back to work!", fail); + + return res; +} + + int mx_mysql_queryf(struct mx_mysql *mysql, const char *fmt, ...) { mx_assert_return_minus_errno(mysql, EINVAL); @@ -1001,8 +1090,8 @@ int mx_mysql_statement_init(struct mx_mysql *mysql, struct mx_mysql_stmt **stmt) if (res != -ENOMEM) return res; - mx_log_debug("mx__mysql_stmt_init() failed: %m - retrying (forever) in %d second(s).", MX_CALLOC_FAIL_WAIT_DEFAULT); - mx_sleep(MX_CALLOC_FAIL_WAIT_DEFAULT); + mx_log_debug("mx__mysql_stmt_init() failed: %m - retrying (forever) in %d second(s).", MX_MYSQL_FAIL_WAIT_DEFAULT); + mx_sleep(MX_MYSQL_FAIL_WAIT_DEFAULT); } while (1); *stmt = s; @@ -1088,7 +1177,7 @@ int mx_mysql_statement_fetch(struct mx_mysql_stmt *stmt) } res = mx__mysql_stmt_fetch(stmt); - if (res == -ENOENT || res == 0) + if (res == -ENOENT) return 0; if (res < 0 && res != -ERANGE) { @@ -1186,7 +1275,6 @@ int mx_mysql_bind_cleanup(struct mx_mysql_bind *bind) { mx_assert_return_minus_errno(bind, EINVAL); - mx_assert_return_minus_errno(bind->type != MX_MYSQL_BIND_TYPE_UNKNOWN, EBADF); mx_free_null(bind->bind); mx_free_null(bind->data); @@ -1246,9 +1334,12 @@ int mx_mysql_do_statement(struct mx_mysql *mysql, char *query, struct mx_mysql_b assert(mysql); + mx_mysql_ping_forever(mysql); + stmt = mx_mysql_statement_prepare_with_bindings(mysql, query, param, result); if (!stmt) { - mx_log_err("mx_mysql_statement_prepare(): %m"); + mx_log_err("mx_mysql_statement_prepare_with_bindings(): %m"); + mx_log_err("mx_mysql_statement_prepare_with_bindings(): query was: %s", query); return -errno; } @@ -1276,9 +1367,35 @@ int mx_mysql_do_statement(struct mx_mysql *mysql, char *query, struct mx_mysql_b } mx_mysql_statement_close(&stmt); - return cnt; + return num_rows; +} + +int mx_mysql_do_statement_retry_on_fail(struct mx_mysql *mysql, char *query, struct mx_mysql_bind *param, struct mx_mysql_bind *result, void *from, void **to, size_t size) +{ + int res; + + mx_log_debug("entered"); + + while (1) { + res = mx_mysql_do_statement(mysql, query, param, result, from, to, size); + + if (res >= 0) + break; + + mx_mysql_assert_usage_ok(res); + + mx_log_warning("mx_mysql_do_statement() failed: %m"); + + if (res != -EAGAIN) + break; + + mx_mysql_ping_forever(mysql); + } + + return res; } + struct mx_mysql_stmt *mx_mysql_statement_prepare_with_bindings(struct mx_mysql *mysql, char *statement, struct mx_mysql_bind *param, struct mx_mysql_bind *result) { int res; @@ -1316,6 +1433,9 @@ struct mx_mysql_stmt *mx_mysql_statement_prepare_with_bindings(struct mx_mysql * return stmt; }; + if (res < 0) + mx__mysql_stmt_log_warning(stmt); + mx_mysql_statement_close(&stmt); return NULL; @@ -1323,9 +1443,6 @@ struct mx_mysql_stmt *mx_mysql_statement_prepare_with_bindings(struct mx_mysql * struct mx_mysql_stmt *mx_mysql_statement_prepare(struct mx_mysql *mysql, char *statement) { - int res; - struct mx_mysql_stmt *stmt = NULL; - mx_assert_return_NULL(mysql, EINVAL); mx_assert_return_NULL(statement, EINVAL); mx_assert_return_NULL(*statement, EINVAL); diff --git a/mx_mysql.h b/mx_mysql.h index 1e5718e..380689c 100644 --- a/mx_mysql.h +++ b/mx_mysql.h @@ -97,14 +97,18 @@ struct mx_mysql_stmt { #define mx_mysql_statement_param_bind(s, i, t, p) mx_mysql_bind_var(&((s)->param), (i), t, (p)) #define mx_mysql_statement_result_bind(s, i, t, p) mx_mysql_bind_var(&((s)->result), (i), t, (p)) -int mx_mysql_init(struct mx_mysql **); +int mx_mysql_initialize(struct mx_mysql **mysql); +int mx_mysql_init(struct mx_mysql *mysql); + int mx_mysql_free(struct mx_mysql **mysql); int mx_mysql_option_set_default_file(struct mx_mysql *mysql, char *fname); int mx_mysql_option_set_default_group(struct mx_mysql *mysql, char *group); +int mx_mysql_option_set_reconnect(struct mx_mysql *mysql, int reconnect); char *mx_mysql_option_get_default_file(struct mx_mysql *mysql); char *mx_mysql_option_get_default_group(struct mx_mysql *mysql); +int mx_mysql_option_get_reconnect(struct mx_mysql *mysql); int mx_mysql_connect(struct mx_mysql **mysql); int mx_mysql_connect_forever_sec(struct mx_mysql **mysql, unsigned int seconds); @@ -116,9 +120,18 @@ int mx_mysql_end(void); int mx_mysql_finish(struct mx_mysql **mysql); +int mx_mysql_ping(struct mx_mysql *mysql); +int mx_mysql_ping_forever(struct mx_mysql *mysql); + + #define mx_mysql_do_statement_noresult(m, q, p) \ mx_mysql_do_statement(m, q, p, NULL, NULL, NULL, 0) + +#define mx_mysql_do_statement_noresult_retry_on_fail(m, q, p) \ + mx_mysql_do_statement_retry_on_fail(m, q, p, NULL, NULL, NULL, 0) + int mx_mysql_do_statement(struct mx_mysql *mysql, char *query, struct mx_mysql_bind *param, struct mx_mysql_bind *result, void *from, void **to, size_t size); +int mx_mysql_do_statement_retry_on_fail(struct mx_mysql *mysql, char *query, struct mx_mysql_bind *param, struct mx_mysql_bind *result, void *from, void **to, size_t size); int mx_mysql_statement_init(struct mx_mysql *mysql, struct mx_mysql_stmt **stmt); struct mx_mysql_stmt *mx_mysql_statement_prepare(struct mx_mysql *mysql, char *statement); diff --git a/mxq_group.c b/mxq_group.c index 59634a2..39047aa 100644 --- a/mxq_group.c +++ b/mxq_group.c @@ -9,7 +9,7 @@ #include "mxq_group.h" #include "mxq_job.h" -#include "mxq_mysql.h" +#include "mxq_util.h" #include "mx_mysql.h" #define GROUP_FIELDS_CNT 26 @@ -41,57 +41,6 @@ " stats_max_stime_sec," \ " stats_max_real_sec" -#define MXQ_GROUP_FIELDS "group_id," \ - "group_name," \ - "group_status," \ - "group_priority," \ - "user_uid," \ - "user_name," \ - "user_gid," \ - "user_group," \ - "job_command," \ - "job_threads," \ - "job_memory," \ - "job_time," \ - "group_jobs," \ - "group_jobs_running," \ - "group_jobs_finished," \ - "group_jobs_failed," \ - "group_jobs_cancelled," \ - "group_jobs_unknown," \ - "group_slots_running," \ - "stats_max_maxrss," \ - "stats_max_utime_sec," \ - "stats_max_stime_sec," \ - "stats_max_real_sec" - -enum mxq_group_columns { - MXQ_GROUP_COL_GROUP_ID=0, - MXQ_GROUP_COL_GROUP_NAME, - MXQ_GROUP_COL_GROUP_STATUS, - MXQ_GROUP_COL_GROUP_PRIORITY, - MXQ_GROUP_COL_USER_UID, - MXQ_GROUP_COL_USER_NAME, - MXQ_GROUP_COL_USER_GID, - MXQ_GROUP_COL_USER_GROUP, - MXQ_GROUP_COL_JOB_COMMAND, - MXQ_GROUP_COL_JOB_THREADS, - MXQ_GROUP_COL_JOB_MEMORY, - MXQ_GROUP_COL_JOB_TIME, - MXQ_GROUP_COL_GROUP_JOBS, - MXQ_GROUP_COL_GROUP_JOBS_RUNNING, - MXQ_GROUP_COL_GROUP_JOBS_FINISHED, - MXQ_GROUP_COL_GROUP_JOBS_FAILED, - MXQ_GROUP_COL_GROUP_JOBS_CANCELLED, - MXQ_GROUP_COL_GROUP_JOBS_UNKNOWN, - MXQ_GROUP_COL_GROUP_SLOTS_RUNNING, - MXQ_GROUP_COL_STATS_MAX_MAXRSS, - MXQ_GROUP_COL_STATS_MAX_UTIME_SEC, - MXQ_GROUP_COL_STATS_MAX_STIME_SEC, - MXQ_GROUP_COL_STATS_MAX_REAL_SEC, - MXQ_GROUP_COL__END -}; - static int bind_result_group_fields(struct mx_mysql_bind *result, struct mxq_group *g) { int res = 0; @@ -137,69 +86,6 @@ static int bind_result_group_fields(struct mx_mysql_bind *result, struct mxq_gro return res; } -static inline int mxq_group_bind_results(MYSQL_BIND *bind, struct mxq_group *g) -{ - memset(bind, 0, sizeof(*bind)*MXQ_GROUP_COL__END); - - MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_ID, &g->group_id); - MXQ_MYSQL_BIND_VARSTR(bind, MXQ_GROUP_COL_GROUP_NAME, &g->_group_name_length); - MXQ_MYSQL_BIND_UINT8(bind, MXQ_GROUP_COL_GROUP_STATUS, &g->group_status); - MXQ_MYSQL_BIND_UINT16(bind, MXQ_GROUP_COL_GROUP_PRIORITY, &g->group_priority); - - MXQ_MYSQL_BIND_UINT32(bind, MXQ_GROUP_COL_USER_UID, &g->user_uid); - MXQ_MYSQL_BIND_VARSTR(bind, MXQ_GROUP_COL_USER_NAME, &g->_user_name_length); - MXQ_MYSQL_BIND_UINT32(bind, MXQ_GROUP_COL_USER_GID, &g->user_gid); - MXQ_MYSQL_BIND_VARSTR(bind, MXQ_GROUP_COL_USER_GROUP, &g->_user_group_length); - - MXQ_MYSQL_BIND_VARSTR(bind, MXQ_GROUP_COL_JOB_COMMAND, &g->_job_command_length); - - MXQ_MYSQL_BIND_UINT16(bind, MXQ_GROUP_COL_JOB_THREADS, &g->job_threads); - MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_JOB_MEMORY, &g->job_memory); - MXQ_MYSQL_BIND_UINT32(bind, MXQ_GROUP_COL_JOB_TIME, &g->job_time); - - MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_JOBS, &g->group_jobs); - MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_JOBS_RUNNING, &g->group_jobs_running); - MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_JOBS_FINISHED, &g->group_jobs_finished); - MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_JOBS_FAILED, &g->group_jobs_failed); - MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_JOBS_CANCELLED, &g->group_jobs_cancelled); - MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_JOBS_UNKNOWN, &g->group_jobs_unknown); - - MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_SLOTS_RUNNING, &g->group_slots_running); - - MXQ_MYSQL_BIND_INT32(bind, MXQ_GROUP_COL_STATS_MAX_MAXRSS, &g->stats_max_maxrss); - MXQ_MYSQL_BIND_UINT32(bind, MXQ_GROUP_COL_STATS_MAX_UTIME_SEC, &g->stats_max_utime.tv_sec); - MXQ_MYSQL_BIND_UINT32(bind, MXQ_GROUP_COL_STATS_MAX_STIME_SEC, &g->stats_max_stime.tv_sec); - MXQ_MYSQL_BIND_UINT32(bind, MXQ_GROUP_COL_STATS_MAX_REAL_SEC, &g->stats_max_real.tv_sec); - - return 1; -} - -static int mxq_group_fetch_results(MYSQL_STMT *stmt, MYSQL_BIND *bind, struct mxq_group *g) -{ - int res; - - memset(g, 0, sizeof(*g)); - - res = mxq_mysql_stmt_fetch_row(stmt); - if (!res) - return 0; - - res = mxq_mysql_stmt_fetch_string(stmt, bind, MXQ_GROUP_COL_GROUP_NAME, &(g->group_name), g->_group_name_length); - if (!res) - return 0; - res = mxq_mysql_stmt_fetch_string(stmt, bind, MXQ_GROUP_COL_USER_NAME, &(g->user_name), g->_user_name_length); - if (!res) - return 0; - res = mxq_mysql_stmt_fetch_string(stmt, bind, MXQ_GROUP_COL_USER_GROUP, &(g->user_group), g->_user_group_length); - if (!res) - return 0; - res = mxq_mysql_stmt_fetch_string(stmt, bind, MXQ_GROUP_COL_JOB_COMMAND, &(g->job_command), g->_job_command_length); - if (!res) - return 0; - - return 1; -} - void mxq_group_free_content(struct mxq_group *g) { free_null(g->group_name); @@ -394,6 +280,39 @@ int mxq_load_active_groups_for_user(struct mx_mysql *mysql, struct mxq_group **m return res; } +int mxq_load_active_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups) +{ + int res; + struct mxq_group *groups = NULL; + struct mxq_group g = {0}; + struct mx_mysql_bind result = {0}; + + assert(mysql); + assert(mxq_groups); + + *mxq_groups = NULL; + + char *query = + "SELECT" + GROUP_FIELDS + " FROM mxq_group" + " WHERE (group_jobs_inq > 0 OR group_jobs_running > 0)" + " ORDER BY user_name, group_mtime" + " LIMIT 1000"; + + res = bind_result_group_fields(&result, &g); + assert(res == 0); + + res = mx_mysql_do_statement_retry_on_fail(mysql, query, NULL, &result, &g, (void **)&groups, sizeof(*groups)); + if (res < 0) { + mx_log_err("mx_mysql_do_statement_retry_on_fail(): %m"); + return res; + } + + *mxq_groups = groups; + return res; +} + int mxq_load_running_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups) { int res; @@ -465,40 +384,3 @@ int mxq_load_running_groups_for_user(struct mx_mysql *mysql, struct mxq_group ** return res; } -int mxq_group_load_active_groups(MYSQL *mysql, struct mxq_group **mxq_group) -{ - MYSQL_STMT *stmt; - MYSQL_BIND result[MXQ_GROUP_COL__END]; - struct mxq_group g; - struct mxq_group *groups; - char *query; - int cnt; - - *mxq_group = NULL; - - mxq_group_bind_results(result, &g); - - query = "SELECT " MXQ_GROUP_FIELDS - " FROM mxq_group" - " WHERE group_jobs-group_jobs_finished-group_jobs_failed-group_jobs_cancelled-group_jobs_unknown > 0" - " ORDER BY user_uid, group_priority DESC"; - - stmt = mxq_mysql_stmt_do_query(mysql, query, MXQ_GROUP_COL__END, NULL, result); - if (!stmt) { - mx_log_err("mxq_mysql_stmt_do_query(mysql=%p, stmt_str=\"%s\", field_count=%d, param=%p, result=%p)", mysql, query, MXQ_GROUP_COL__END, NULL, result); - return 0; - } - - cnt = 0; - groups = NULL; - while (mxq_group_fetch_results(stmt, result, &g)) { - groups = realloc_forever(groups, sizeof(*groups)*(cnt+1)); - memcpy(groups+cnt, &g, sizeof(*groups)); - cnt++; - } - - *mxq_group = groups; - - mysql_stmt_close(stmt); - return cnt; -} diff --git a/mxq_group.h b/mxq_group.h index 65e293e..577701b 100644 --- a/mxq_group.h +++ b/mxq_group.h @@ -53,19 +53,18 @@ struct mxq_group { #define MXQ_GROUP_STATUS_OK 0 #define MXQ_GROUP_STATUS_CANCELLED 99 +void mxq_group_free_content(struct mxq_group *g); + inline uint64_t mxq_group_jobs_done(struct mxq_group *g); inline uint64_t mxq_group_jobs_active(struct mxq_group *g); inline uint64_t mxq_group_jobs_inq(struct mxq_group *g); -int mxq_load_group(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t group_id); -int mxq_load_all_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups); -int mxq_load_all_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid); -int mxq_load_active_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid); -int mxq_load_running_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups); -int mxq_load_running_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid); - -int mxq_group_load_active_groups(MYSQL *mysql, struct mxq_group **mxq_group); -void mxq_group_free_content(struct mxq_group *g); -int mxq_group_update_status_cancelled(MYSQL *mysql, struct mxq_group *group); +int mxq_load_group(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t group_id); +int mxq_load_all_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups); +int mxq_load_active_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups); +int mxq_load_all_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid); +int mxq_load_active_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid); +int mxq_load_running_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups); +int mxq_load_running_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid); #endif diff --git a/mxq_job.c b/mxq_job.c index 428a7f2..354d574 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -1,4 +1,3 @@ - #define _GNU_SOURCE #include @@ -7,6 +6,7 @@ #include #include #include +#include #include @@ -15,7 +15,7 @@ #include "mxq_group.h" #include "mxq_job.h" -#include "mxq_mysql.h" +#include "mxq_util.h" #define JOB_FIELDS_CNT 34 #define JOB_FIELDS \ @@ -60,79 +60,6 @@ " stats_nvcsw, " \ " stats_nivcsw" -#define MXQ_JOB_FIELDS "job_id, " \ - "job_status, " \ - "job_flags, " \ - "job_priority, " \ - "group_id, " \ - "job_workdir, " \ - "job_argc, " \ - "job_argv, " \ - "job_stdout, " \ - "job_stderr, " \ - "job_umask, " \ - "host_submit, " \ - "server_id, " \ - "host_hostname, " \ - "host_pid, " \ - "host_slots, " \ - "UNIX_TIMESTAMP(date_submit) as date_submit, " \ - "UNIX_TIMESTAMP(date_start) as date_start, " \ - "UNIX_TIMESTAMP(date_end) as date_end, " \ - "stats_status, " \ - "stats_utime_sec, " \ - "stats_utime_usec, " \ - "stats_stime_sec, " \ - "stats_stime_usec, " \ - "stats_real_sec, " \ - "stats_real_usec, " \ - "stats_maxrss, " \ - "stats_minflt, " \ - "stats_majflt, " \ - "stats_nswap, " \ - "stats_inblock, " \ - "stats_oublock, " \ - "stats_nvcsw, " \ - "stats_nivcsw" - -enum mxq_job_columns { - MXQ_JOB_COL_JOB_ID, - MXQ_JOB_COL_JOB_STATUS, - MXQ_JOB_COL_JOB_FLAGS, - MXQ_JOB_COL_JOB_PRIORITY, - MXQ_JOB_COL_GROUP_ID, - MXQ_JOB_COL_JOB_WORKDIR, - MXQ_JOB_COL_JOB_ARGC, - MXQ_JOB_COL_JOB_ARGV, - MXQ_JOB_COL_JOB_STDOUT, - MXQ_JOB_COL_JOB_STDERR, - MXQ_JOB_COL_JOB_UMASK, - MXQ_JOB_COL_HOST_SUBMIT, - MXQ_JOB_COL_SERVER_ID, - MXQ_JOB_COL_HOST_HOSTNAME, - MXQ_JOB_COL_HOST_PID, - MXQ_JOB_COL_HOST_SLOTS, - MXQ_JOB_COL_DATE_SUBMIT, - MXQ_JOB_COL_DATE_START, - MXQ_JOB_COL_DATE_END, - MXQ_JOB_COL_STATS_STATUS, - MXQ_JOB_COL_STATS_UTIME_SEC, - MXQ_JOB_COL_STATS_UTIME_USEC, - MXQ_JOB_COL_STATS_STIME_SEC, - MXQ_JOB_COL_STATS_STIME_USEC, - MXQ_JOB_COL_STATS_REAL_SEC, - MXQ_JOB_COL_STATS_REAL_USEC, - MXQ_JOB_COL_STATS_MAXRSS, - MXQ_JOB_COL_STATS_MINFLT, - MXQ_JOB_COL_STATS_MAJFLT, - MXQ_JOB_COL_STATS_NSWAP, - MXQ_JOB_COL_STATS_INBLOCK, - MXQ_JOB_COL_STATS_OUBLOCK, - MXQ_JOB_COL_STATS_NVCSW, - MXQ_JOB_COL_STATS_NIVCSW, - MXQ_JOB_COL__END -}; - static int bind_result_job_fields(struct mx_mysql_bind *result, struct mxq_job *j) { int res = 0; @@ -219,112 +146,6 @@ char *mxq_job_status_to_name(uint64_t status) return "invalid"; } -static inline int mxq_job_bind_results(MYSQL_BIND *bind, struct mxq_job *j) -{ - memset(bind, 0, sizeof(*bind)*MXQ_JOB_COL__END); - - MXQ_MYSQL_BIND_UINT64(bind, MXQ_JOB_COL_JOB_ID, &j->job_id); - MXQ_MYSQL_BIND_UINT16(bind, MXQ_JOB_COL_JOB_STATUS, &j->job_status); - MXQ_MYSQL_BIND_UINT64(bind, MXQ_JOB_COL_JOB_FLAGS, &j->job_flags); - MXQ_MYSQL_BIND_UINT16(bind, MXQ_JOB_COL_JOB_PRIORITY, &j->job_priority); - - MXQ_MYSQL_BIND_UINT64(bind, MXQ_JOB_COL_GROUP_ID, &j->group_id); - - MXQ_MYSQL_BIND_VARSTR(bind, MXQ_JOB_COL_JOB_WORKDIR, &j->_job_workdir_length); - - MXQ_MYSQL_BIND_UINT16(bind, MXQ_JOB_COL_JOB_ARGC, &j->job_argc); - MXQ_MYSQL_BIND_VARSTR(bind, MXQ_JOB_COL_JOB_ARGV, &j->_job_argv_str_length); - - MXQ_MYSQL_BIND_VARSTR(bind, MXQ_JOB_COL_JOB_STDOUT, &j->_job_stdout_length); - MXQ_MYSQL_BIND_VARSTR(bind, MXQ_JOB_COL_JOB_STDERR, &j->_job_stderr_length); - - MXQ_MYSQL_BIND_UINT32(bind, MXQ_JOB_COL_JOB_UMASK, &j->job_umask); - - MXQ_MYSQL_BIND_VARSTR(bind, MXQ_JOB_COL_HOST_SUBMIT, &j->_host_submit_length); - MXQ_MYSQL_BIND_VARSTR(bind, MXQ_JOB_COL_SERVER_ID, &j->_server_id_length); - - MXQ_MYSQL_BIND_VARSTR(bind, MXQ_JOB_COL_HOST_HOSTNAME, &j->_host_hostname_length); - MXQ_MYSQL_BIND_UINT32(bind, MXQ_JOB_COL_HOST_PID, &j->host_pid); - MXQ_MYSQL_BIND_UINT32(bind, MXQ_JOB_COL_HOST_SLOTS, &j->host_slots); - - MXQ_MYSQL_BIND_INT32(bind, MXQ_JOB_COL_DATE_SUBMIT, &j->date_submit); - MXQ_MYSQL_BIND_INT32(bind, MXQ_JOB_COL_DATE_START, &j->date_start); - MXQ_MYSQL_BIND_INT32(bind, MXQ_JOB_COL_DATE_END, &j->date_end); - - MXQ_MYSQL_BIND_UINT32(bind, MXQ_JOB_COL_STATS_UTIME_SEC, &j->stats_rusage.ru_utime.tv_sec); - MXQ_MYSQL_BIND_UINT32(bind, MXQ_JOB_COL_STATS_UTIME_USEC, &j->stats_rusage.ru_utime.tv_usec); - - MXQ_MYSQL_BIND_UINT32(bind, MXQ_JOB_COL_STATS_STIME_SEC, &j->stats_rusage.ru_stime.tv_sec); - MXQ_MYSQL_BIND_UINT32(bind, MXQ_JOB_COL_STATS_STIME_USEC, &j->stats_rusage.ru_stime.tv_usec); - - MXQ_MYSQL_BIND_UINT32(bind, MXQ_JOB_COL_STATS_REAL_SEC, &j->stats_realtime.tv_sec); - MXQ_MYSQL_BIND_UINT32(bind, MXQ_JOB_COL_STATS_REAL_USEC, &j->stats_realtime.tv_usec); - - MXQ_MYSQL_BIND_INT32(bind, MXQ_JOB_COL_STATS_STATUS, &j->stats_status); - MXQ_MYSQL_BIND_INT32(bind, MXQ_JOB_COL_STATS_MAXRSS, &j->stats_rusage.ru_maxrss); - MXQ_MYSQL_BIND_INT32(bind, MXQ_JOB_COL_STATS_MINFLT, &j->stats_rusage.ru_minflt); - MXQ_MYSQL_BIND_INT32(bind, MXQ_JOB_COL_STATS_MAJFLT, &j->stats_rusage.ru_majflt); - MXQ_MYSQL_BIND_INT32(bind, MXQ_JOB_COL_STATS_NSWAP, &j->stats_rusage.ru_nswap); - MXQ_MYSQL_BIND_INT32(bind, MXQ_JOB_COL_STATS_INBLOCK, &j->stats_rusage.ru_inblock); - MXQ_MYSQL_BIND_INT32(bind, MXQ_JOB_COL_STATS_OUBLOCK, &j->stats_rusage.ru_oublock); - MXQ_MYSQL_BIND_INT32(bind, MXQ_JOB_COL_STATS_NVCSW, &j->stats_rusage.ru_nvcsw); - MXQ_MYSQL_BIND_INT32(bind, MXQ_JOB_COL_STATS_NIVCSW, &j->stats_rusage.ru_nivcsw); - - return 1; -} - -int mxq_job_fetch_results(MYSQL_STMT *stmt, MYSQL_BIND *bind, struct mxq_job *j) -{ - int res; - - memset(j, 0, sizeof(*j)); - - res = mxq_mysql_stmt_fetch_row(stmt); - if (!res) { - if (errno == ENOENT) - return 0; - perror("xxxx0"); - return -1; - } - - res = mxq_mysql_stmt_fetch_string(stmt, bind, MXQ_JOB_COL_JOB_WORKDIR, &(j->job_workdir), j->_job_workdir_length); - if (!res) { - return -1; - } - - res = mxq_mysql_stmt_fetch_string(stmt, bind, MXQ_JOB_COL_JOB_ARGV, &(j->job_argv_str), j->_job_argv_str_length); - if (!res) { - return -1; - } - - res = mxq_mysql_stmt_fetch_string(stmt, bind, MXQ_JOB_COL_JOB_STDOUT, &(j->job_stdout), j->_job_stdout_length); - if (!res) { - return -1; - } - - res = mxq_mysql_stmt_fetch_string(stmt, bind, MXQ_JOB_COL_JOB_STDERR, &(j->job_stderr), j->_job_stderr_length); - if (!res) { - return -1; - } - - res = mxq_mysql_stmt_fetch_string(stmt, bind, MXQ_JOB_COL_HOST_SUBMIT, &(j->host_submit), j->_host_submit_length); - if (!res) { - return -1; - } - - res = mxq_mysql_stmt_fetch_string(stmt, bind, MXQ_JOB_COL_SERVER_ID, &(j->server_id), j->_server_id_length); - if (!res) { - return -1; - } - - res = mxq_mysql_stmt_fetch_string(stmt, bind, MXQ_JOB_COL_HOST_HOSTNAME, &(j->host_hostname), j->_host_hostname_length); - if (!res) { - return -1; - } - - return 1; -} - void mxq_job_free_content(struct mxq_job *j) { free_null(j->job_workdir); @@ -380,6 +201,7 @@ int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job res = mx_mysql_bind_init_param(¶m, 1); assert(res == 0); + res = mx_mysql_bind_var(¶m, 0, uint64, &job_id); assert(res == 0); @@ -417,6 +239,7 @@ int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, st res = mx_mysql_bind_init_param(¶m, 1); assert(res == 0); + res = mx_mysql_bind_var(¶m, 0, uint64, &(grp->group_id)); assert(res == 0); @@ -472,15 +295,19 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job ** return res; } -int mxq_job_update_status_assigned(MYSQL *mysql, struct mxq_job *job) +int mxq_assign_job_from_group_to_server(struct mx_mysql *mysql, uint64_t group_id, char *hostname, char *server_id) { - char *query; - MYSQL_BIND param[3]; - int res; + int res; + struct mx_mysql_bind param = {0}; - memset(param, 0, sizeof(param)); + assert(mysql); + assert(hostname); + assert(*hostname); + assert(server_id); + assert(*server_id); - query = "UPDATE mxq_job SET" + char *query = + "UPDATE mxq_job SET" " job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) "," " host_hostname = ?," " server_id = ?" @@ -492,26 +319,34 @@ int mxq_job_update_status_assigned(MYSQL *mysql, struct mxq_job *job) " ORDER BY job_priority, job_id" " LIMIT 1"; - MXQ_MYSQL_BIND_STRING(param, 0, job->host_hostname); - MXQ_MYSQL_BIND_STRING(param, 1, job->server_id); - MXQ_MYSQL_BIND_UINT64(param, 2, &job->group_id); + res = mx_mysql_bind_init_param(¶m, 3); + assert(res == 0); - res = mxq_mysql_do_update(mysql, query, param); + res = 0; + res += mx_mysql_bind_var(¶m, 0, string, &hostname); + res += mx_mysql_bind_var(¶m, 1, string, &server_id); + res += mx_mysql_bind_var(¶m, 2, uint64, &group_id); + assert(res == 0); - job->job_status = MXQ_JOB_STATUS_ASSIGNED; + res = mx_mysql_do_statement_noresult(mysql, query, ¶m); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } return res; } -int mxq_job_update_status_loaded(MYSQL *mysql, struct mxq_job *job) +int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job) { - char *query; - MYSQL_BIND param[3]; - int res; + int res; + struct mx_mysql_bind param = {0}; - memset(param, 0, sizeof(param)); + assert(mysql); + assert(job); - query = "UPDATE mxq_job SET" + char *query = + "UPDATE mxq_job SET" " job_status = " status_str(MXQ_JOB_STATUS_LOADED) " WHERE job_id = ?" " AND job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) @@ -519,24 +354,33 @@ int mxq_job_update_status_loaded(MYSQL *mysql, struct mxq_job *job) " AND server_id = ?" " AND host_pid = 0"; - MXQ_MYSQL_BIND_UINT64(param, 0, &job->job_id); - MXQ_MYSQL_BIND_STRING(param, 1, job->host_hostname); - MXQ_MYSQL_BIND_STRING(param, 2, job->server_id); + res = mx_mysql_bind_init_param(¶m, 3); + assert(res == 0); - res = mxq_mysql_do_update(mysql, query, param); + res = 0; + res += mx_mysql_bind_var(¶m, 0, uint64, &(job->job_id)); + res += mx_mysql_bind_var(¶m, 1, string, &(job->host_hostname)); + res += mx_mysql_bind_var(¶m, 2, string, &(job->server_id)); + assert(res == 0); + + res = mx_mysql_do_statement_noresult(mysql, query, ¶m); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } job->job_status = MXQ_JOB_STATUS_LOADED; return res; } -int mxq_job_update_status_running(MYSQL *mysql, struct mxq_job *job) +int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job) { - char *query; - MYSQL_BIND param[5]; - int res; + int res; + struct mx_mysql_bind param = {0}; - memset(param, 0, sizeof(param)); + assert(mysql); + assert(job); if (job->job_status != MXQ_JOB_STATUS_LOADED) { mx_log_warning("new status==runnning but old status(=%d) is != loaded ", job->job_status); @@ -546,7 +390,9 @@ int mxq_job_update_status_running(MYSQL *mysql, struct mxq_job *job) return -1; } } - query = "UPDATE mxq_job SET" + + char *query = + "UPDATE mxq_job SET" " job_status = " status_str(MXQ_JOB_STATUS_RUNNING) "," " date_start = NULL," " host_pid = ?," @@ -557,27 +403,36 @@ int mxq_job_update_status_running(MYSQL *mysql, struct mxq_job *job) " AND server_id = ?" " AND host_pid = 0"; - MXQ_MYSQL_BIND_UINT32(param, 0, &job->host_pid); - MXQ_MYSQL_BIND_UINT32(param, 1, &job->host_slots); - MXQ_MYSQL_BIND_UINT64(param, 2, &job->job_id); - MXQ_MYSQL_BIND_STRING(param, 3, job->host_hostname); - MXQ_MYSQL_BIND_STRING(param, 4, job->server_id); + res = mx_mysql_bind_init_param(¶m, 5); + assert(res == 0); - res = mxq_mysql_do_update(mysql, query, param); + res = 0; + res += mx_mysql_bind_var(¶m, 0, uint32, &(job->host_pid)); + res += mx_mysql_bind_var(¶m, 1, uint32, &(job->host_slots)); + res += mx_mysql_bind_var(¶m, 2, uint64, &(job->job_id)); + res += mx_mysql_bind_var(¶m, 3, string, &(job->host_hostname)); + res += mx_mysql_bind_var(¶m, 4, string, &(job->server_id)); + assert(res == 0); + + res = mx_mysql_do_statement_noresult(mysql, query, ¶m); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } job->job_status = MXQ_JOB_STATUS_RUNNING; return res; } -int mxq_job_update_status_exit(MYSQL *mysql, struct mxq_job *job) +int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job) { - char *query; - MYSQL_BIND param[20]; + int res; uint16_t newstatus; - int res; + struct mx_mysql_bind param = {0}; - memset(param, 0, sizeof(param)); + assert(mysql); + assert(job); if (WIFEXITED(job->stats_status)) { if (WEXITSTATUS(job->stats_status)) { @@ -592,7 +447,13 @@ int mxq_job_update_status_exit(MYSQL *mysql, struct mxq_job *job) errno = EINVAL; return -1; } - query = "UPDATE mxq_job SET" + + if (job->job_status != MXQ_JOB_STATUS_RUNNING) { + mx_log_warning("new status==exited but old status(=%d) is != running ", job->job_status); + } + + char *query = + "UPDATE mxq_job SET" " job_status = ?," " date_end = NULL," " stats_status = ?, " @@ -616,34 +477,37 @@ int mxq_job_update_status_exit(MYSQL *mysql, struct mxq_job *job) " AND server_id = ?" " AND host_pid = ?"; - MXQ_MYSQL_BIND_UINT16(param, 0, &newstatus); - - MXQ_MYSQL_BIND_INT32(param, 1, &job->stats_status); - - MXQ_MYSQL_BIND_UINT32(param, 2, &job->stats_rusage.ru_utime.tv_sec); - MXQ_MYSQL_BIND_UINT32(param, 3, &job->stats_rusage.ru_utime.tv_usec); - - MXQ_MYSQL_BIND_UINT32(param, 4, &job->stats_rusage.ru_stime.tv_sec); - MXQ_MYSQL_BIND_UINT32(param, 5, &job->stats_rusage.ru_stime.tv_usec); - - MXQ_MYSQL_BIND_UINT32(param, 6, &job->stats_realtime.tv_sec); - MXQ_MYSQL_BIND_UINT32(param, 7, &job->stats_realtime.tv_usec); - - MXQ_MYSQL_BIND_INT32(param, 8, &job->stats_rusage.ru_maxrss); - MXQ_MYSQL_BIND_INT32(param, 9, &job->stats_rusage.ru_minflt); - MXQ_MYSQL_BIND_INT32(param, 10, &job->stats_rusage.ru_majflt); - MXQ_MYSQL_BIND_INT32(param, 11, &job->stats_rusage.ru_nswap); - MXQ_MYSQL_BIND_INT32(param, 12, &job->stats_rusage.ru_inblock); - MXQ_MYSQL_BIND_INT32(param, 13, &job->stats_rusage.ru_oublock); - MXQ_MYSQL_BIND_INT32(param, 14, &job->stats_rusage.ru_nvcsw); - MXQ_MYSQL_BIND_INT32(param, 15, &job->stats_rusage.ru_nivcsw); + res = mx_mysql_bind_init_param(¶m, 20); + assert(res == 0); - MXQ_MYSQL_BIND_UINT64(param, 16, &job->job_id); - MXQ_MYSQL_BIND_STRING(param, 17, job->host_hostname); - MXQ_MYSQL_BIND_STRING(param, 18, job->server_id); - MXQ_MYSQL_BIND_UINT32(param, 19, &job->host_pid); + res = 0; + res += mx_mysql_bind_var(¶m, 0, uint16, &(newstatus)); + res += mx_mysql_bind_var(¶m, 1, int32, &(job->stats_status)); + res += mx_mysql_bind_var(¶m, 2, int64, &(job->stats_rusage.ru_utime.tv_sec)); + res += mx_mysql_bind_var(¶m, 3, int64, &(job->stats_rusage.ru_utime.tv_usec)); + res += mx_mysql_bind_var(¶m, 4, int64, &(job->stats_rusage.ru_stime.tv_sec)); + res += mx_mysql_bind_var(¶m, 5, int64, &(job->stats_rusage.ru_stime.tv_usec)); + res += mx_mysql_bind_var(¶m, 6, int64, &(job->stats_realtime.tv_sec)); + res += mx_mysql_bind_var(¶m, 7, int64, &(job->stats_realtime.tv_usec)); + res += mx_mysql_bind_var(¶m, 8, int64, &(job->stats_rusage.ru_maxrss)); + res += mx_mysql_bind_var(¶m, 9, int64, &(job->stats_rusage.ru_minflt)); + res += mx_mysql_bind_var(¶m, 10, int64, &(job->stats_rusage.ru_majflt)); + res += mx_mysql_bind_var(¶m, 11, int64, &(job->stats_rusage.ru_nswap)); + res += mx_mysql_bind_var(¶m, 12, int64, &(job->stats_rusage.ru_inblock)); + res += mx_mysql_bind_var(¶m, 13, int64, &(job->stats_rusage.ru_oublock)); + res += mx_mysql_bind_var(¶m, 14, int64, &(job->stats_rusage.ru_nvcsw)); + res += mx_mysql_bind_var(¶m, 15, int64, &(job->stats_rusage.ru_nivcsw)); + res += mx_mysql_bind_var(¶m, 16, uint64, &(job->job_id)); + res += mx_mysql_bind_var(¶m, 17, string, &(job->host_hostname)); + res += mx_mysql_bind_var(¶m, 18, string, &(job->server_id)); + res += mx_mysql_bind_var(¶m, 19, uint32, &(job->host_pid)); + assert(res == 0); - res = mxq_mysql_do_update(mysql, query, param); + res = mx_mysql_do_statement_noresult(mysql, query, ¶m); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } job->job_status = newstatus; @@ -652,8 +516,6 @@ int mxq_job_update_status_exit(MYSQL *mysql, struct mxq_job *job) int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j) { - int res; - if (!streq(j->job_stdout, "/dev/null")) { _mx_cleanup_free_ char *dir = NULL; @@ -680,86 +542,98 @@ int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j) return 1; } - -int mxq_job_load_assigned(MYSQL *mysql, struct mxq_job *job, char *hostname, char *server_id) +int mxq_load_job_assigned_to_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, char *hostname, char *server_id) { - MYSQL_STMT *stmt; - MYSQL_BIND result[MXQ_JOB_COL__END]; - char *query; - MYSQL_BIND param[2]; int res; + struct mxq_job *jobs = NULL; + struct mxq_job j = {0}; + struct mx_mysql_bind param = {0}; + struct mx_mysql_bind result = {0}; + assert(mysql); + assert(mxq_jobs); + assert(!(*mxq_jobs)); assert(hostname); + assert(*hostname); assert(server_id); + assert(*server_id); - memset(param, 0, sizeof(param)); - MXQ_MYSQL_BIND_STRING(param, 0, hostname); - MXQ_MYSQL_BIND_STRING(param, 1, server_id); - - mxq_job_bind_results(result, job); - - query = "SELECT " MXQ_JOB_FIELDS " FROM mxq_job" + char *query = + "SELECT" + JOB_FIELDS + " FROM mxq_job" " WHERE job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) " AND host_hostname = ?" " AND server_id = ?" " LIMIT 1"; - stmt = mxq_mysql_stmt_do_query(mysql, query, MXQ_JOB_COL__END, param, result); - if (!stmt) { - mx_log_err("mxq_job_load_assigned(mysql=%p, stmt_str=\"%s\", field_count=%d, param=%p, result=%p)", mysql, query, MXQ_JOB_COL__END, param, result); - return -1; - } + res = mx_mysql_bind_init_param(¶m, 2); + assert(res == 0); + + res = 0; + res += mx_mysql_bind_var(¶m, 0, string, &hostname); + res += mx_mysql_bind_var(¶m, 1, string, &server_id); + assert(res == 0); - res = mxq_job_fetch_results(stmt, result, job); + res = bind_result_job_fields(&result, &j); + assert(res == 0); + + res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); if (res < 0) { - mxq_mysql_print_error(mysql); - mxq_mysql_stmt_print_error(stmt); - mx_log_err("mxq_job_fetch_results.."); - mysql_stmt_close(stmt); - return -1; + mx_log_err("mx_mysql_do_statement(): %m"); + return res; } - mysql_stmt_close(stmt); + *mxq_jobs = jobs; return res; } -int mxq_job_load(MYSQL *mysql, struct mxq_job *mxqjob, uint64_t group_id, char *hostname, char *server_id) +int mxq_load_job_from_group_for_server(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, char *hostname, char *server_id) { int res; + struct mxq_job *jobs = NULL; - memset(mxqjob, 0, sizeof(*mxqjob)); + assert(mysql); + assert(mxqjob); + assert(hostname); + assert(*hostname); + assert(server_id); + assert(*server_id); do { - res = mxq_job_load_assigned(mysql, mxqjob, hostname, server_id); + res = mxq_load_job_assigned_to_server(mysql, &jobs, hostname, server_id); + if(res < 0) { - mx_log_err(" group_id=%lu job_id=%lu :: mxq_job_load_assigned: %m", group_id, mxqjob->job_id); + mx_log_err(" group_id=%lu :: mxq_load_job_assigned_to_server: %m", group_id); return 0; } if(res == 1) { + memcpy(mxqjob, &jobs[0], sizeof(*mxqjob)); break; } - mxqjob->host_hostname = hostname; - mxqjob->server_id = server_id; - mxqjob->group_id = group_id; - mxqjob->job_status = MXQ_JOB_STATUS_INQ; - - res = mxq_job_update_status_assigned(mysql, mxqjob); + res = mxq_assign_job_from_group_to_server(mysql, group_id, hostname, server_id); if (res < 0) { - if (errno == ENOENT) { - mx_log_warning(" group_id=%lu :: mxq_job_update_status_assigned(): No matching job found - maybe another server was a bit faster. ;)", group_id); - } else { - mx_log_err(" group_id=%lu :: mxq_job_update_status_assigned(): %m", group_id); - } + mx_log_err(" group_id=%lu :: mxq_assign_job_from_group_to_server(): %m", group_id); + return 0; + } + if (res == 0) { + mx_log_warning(" group_id=%lu :: mxq_assign_job_from_group_to_server(): No matching job found - maybe another server was a bit faster. ;)", group_id); return 0; } } while (1); - res = mxq_job_update_status_loaded(mysql, mxqjob); + res = mxq_set_job_status_loaded_on_server(mysql, mxqjob); if (res < 0) { - mx_log_err(" group_id=%lu job_id=%lu :: mxq_job_update_status_loaded(): %m", group_id, mxqjob->job_id); + mx_log_err(" group_id=%lu job_id=%lu :: mxq_set_job_status_loaded_on_server(): %m", group_id, mxqjob->job_id); + return 0; + } + if (res == 0) { + mx_log_err(" group_id=%lu job_id=%lu :: mxq_set_job_status_loaded_on_server(): Job not found", group_id, mxqjob->job_id); return 0; } + mxqjob->job_status = MXQ_JOB_STATUS_LOADED; + return 1; } diff --git a/mxq_job.h b/mxq_job.h index 674da68..3480c95 100644 --- a/mxq_job.h +++ b/mxq_job.h @@ -87,19 +87,17 @@ struct mxq_job { char *mxq_job_status_to_name(uint64_t status); +void mxq_job_free_content(struct mxq_job *j); + int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job_id); int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp); int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp, uint64_t job_status); - -int mxq_job_load_assigned(MYSQL *mysql, struct mxq_job *job, char *hostname, char *server_id); -void mxq_job_free_content(struct mxq_job *j); -int mxq_job_load(MYSQL *mysql, struct mxq_job *mxqjob, uint64_t group_id, char *hostname, char *server_id); -int mxq_job_update_status_assigned(MYSQL *mysql, struct mxq_job *job); -int mxq_job_update_status_loaded(MYSQL *mysql, struct mxq_job *job); -int mxq_job_update_status_running(MYSQL *mysql, struct mxq_job *job); -int mxq_job_update_status_exit(MYSQL *mysql, struct mxq_job *job); +int mxq_assign_job_from_group_to_server(struct mx_mysql *mysql, uint64_t group_id, char *hostname, char *server_id); +int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job); +int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job); +int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job); int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j); - -int mxq_job_update_status_cancelled_by_group(MYSQL *mysql, struct mxq_group *group); +int mxq_load_job_assigned_to_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, char *hostname, char *server_id); +int mxq_load_job_from_group_for_server(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, char *hostname, char *server_id); #endif diff --git a/mxq_mysql.c b/mxq_mysql.c deleted file mode 100644 index 7b652d4..0000000 --- a/mxq_mysql.c +++ /dev/null @@ -1,207 +0,0 @@ - -#define _GNU_SOURCE - -#include -#include -#include -#include -#include - -#include - -#include -#include - -#include - -#include - -#include "mx_log.h" -#include "mx_util.h" -#include "mxq_mysql.h" -#include "mxq_util.h" - - -MYSQL *mxq_mysql_connect(struct mxq_mysql *mmysql) -{ - MYSQL *mysql; - MYSQL *mres; - - my_bool reconnect = 1; - int try = 1; - - mysql = mysql_init(NULL); - if (!mysql) - return NULL; - - while (1) { - if (mmysql->default_file && *mmysql->default_file) - if (*mmysql->default_file != '/' || euidaccess(mmysql->default_file, R_OK) == 0) - mysql_options(mysql, MYSQL_READ_DEFAULT_FILE, mmysql->default_file); - - if (mmysql->default_group && *mmysql->default_group) - mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, mmysql->default_group); - else - mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "mxq"); - - mysql_options(mysql, MYSQL_OPT_RECONNECT, &reconnect); - - mres = mysql_real_connect(mysql, NULL, NULL, NULL, NULL, 0, NULL, 0); - if (mres == mysql) - return mysql; - - mx_log_err("MAIN: Failed to connect to database (try=%d): Error: %s", try++, mysql_error(mysql)); - sleep(1); - } - return NULL; -} - -void mxq_mysql_close(MYSQL *mysql) { - mysql_close(mysql); - mysql_library_end(); -} - -MYSQL_STMT *mxq_mysql_stmt_do_query(MYSQL *mysql, char *stmt_str, int field_count, MYSQL_BIND *param, MYSQL_BIND *result) -{ - MYSQL_STMT *stmt; - int res; - long tries = 0; - struct timespec nsleep = {0}; - - assert(mysql); - assert(stmt_str); - assert(field_count >= 0); - - stmt = mysql_stmt_init(mysql); - if (!stmt) { - mx_log_err("mysql_stmt_init(mysql=%p)", mysql); - mxq_mysql_print_error(mysql); - return NULL; - } - - res = mysql_stmt_prepare(stmt, stmt_str, strlen(stmt_str)); - if (res) { - mx_log_err("mysql_stmt_prepare(stmt=%p, stmt_str=\"%s\", length=%ld)", stmt, stmt_str, strlen(stmt_str)); - mxq_mysql_stmt_print_error(stmt); - mysql_stmt_close(stmt); - return NULL; - } - - if (mysql_stmt_field_count(stmt) != field_count) { - mx_log_err("mysql_stmt_field_count(stmt=%p) does not match requested field_count (=%d)", stmt, field_count); - mysql_stmt_close(stmt); - return NULL; - } - - if (result) { - res = mysql_stmt_bind_result(stmt, result); - if (res) { - mx_log_err("mysql_stmt_bind_result(stmt=%p)", stmt); - mxq_mysql_stmt_print_error(stmt); - mysql_stmt_close(stmt); - return NULL; - } - } - - if (param) { - res = mysql_stmt_bind_param(stmt, param); - if (res) { - mx_log_err("mysql_stmt_bind_param(stmt=%p)", stmt); - mxq_mysql_stmt_print_error(stmt); - mysql_stmt_close(stmt); - return NULL; - } - } - - do { - res = mysql_stmt_execute(stmt); - if (!res) - break; - - if (mysql_stmt_errno(stmt) == ER_LOCK_DEADLOCK) { - mx_log_warning("MySQL recoverable error detected in mysql_stmt_execute(): ER_LOCK_DEADLOCK. (try %ld).", ++tries); - nsleep.tv_nsec = tries*1000000L; - nanosleep(&nsleep, NULL); - continue; - } - - if (mysql_stmt_errno(stmt) == ER_LOCK_WAIT_TIMEOUT) { - mx_log_warning("MySQL recoverable error detected in mysql_stmt_execute(): ER_LOCK_WAIT_TIMEOUT. (try %ld).", ++tries); - nsleep.tv_nsec = tries*1000000L; - nanosleep(&nsleep, NULL); - continue; - } - - mxq_mysql_stmt_print_error(stmt); - mysql_stmt_close(stmt); - return NULL; - } while(1); - - return stmt; -} - -int mxq_mysql_stmt_fetch_string(MYSQL_STMT *stmt, MYSQL_BIND *bind, int col, char **buf, unsigned long len) -{ - char *s; - int res; - - if (len > 0) { - s = malloc(len+1); - if (!s) { - errno = ENOMEM; - return 0; - } - bind[col].buffer = *buf = s; - bind[col].buffer_length = len; - - res = mysql_stmt_fetch_column(stmt, &bind[col], col, 0); - s[len] = 0; - - if (res) { - errno = EIO; - return 0; - } - } - return 1; -} - -int mxq_mysql_stmt_fetch_row(MYSQL_STMT *stmt) -{ - int res; - - res = mysql_stmt_fetch(stmt); - - if (res && res != MYSQL_DATA_TRUNCATED) { - if (res == MYSQL_NO_DATA) { - errno = ENOENT; - return 0; - } - mxq_mysql_stmt_print_error(stmt); - errno = EIO; - return 0; - } - return 1; -} - -int mxq_mysql_do_update(MYSQL *mysql, char* query, MYSQL_BIND *param) -{ - MYSQL_STMT *stmt; - int res; - - stmt = mxq_mysql_stmt_do_query(mysql, query, 0, param, NULL); - if (!stmt) { - mx_log_err("mxq_mysql_do_update: Failed to query database."); - errno = EIO; - return -1; - } - - res = mysql_stmt_affected_rows(stmt); - mysql_stmt_close(stmt); - - if (res == 0) { - errno = ENOENT; - return -1; - } - - return res; -} diff --git a/mxq_mysql.h b/mxq_mysql.h deleted file mode 100644 index 26f012a..0000000 --- a/mxq_mysql.h +++ /dev/null @@ -1,83 +0,0 @@ - -#ifndef __MXQ_MYSQL_H__ -#define __MXQ_MYSQL_H__ 1 - -#include - -#include "mx_log.h" -#include "mxq_util.h" - -#include - -struct mxq_mysql { - char *default_file; - char *default_group; -}; - -MYSQL *mxq_mysql_connect(struct mxq_mysql *mmysql); -void mxq_mysql_close(MYSQL *mysql); - -int mxq_mysql_stmt_fetch_string(MYSQL_STMT *stmt, MYSQL_BIND *bind, int col, char **buf, unsigned long len); -int mxq_mysql_stmt_fetch_row(MYSQL_STMT *stmt); - -MYSQL_STMT *mxq_mysql_stmt_do_query(MYSQL *mysql, char *stmt_str, int field_count, MYSQL_BIND *param, MYSQL_BIND *result); - -int mxq_mysql_do_update(MYSQL *mysql, char* query, MYSQL_BIND *param); - -#define MXQ_MYSQL_BIND_INT(b, c, v, t, s) \ - do { \ - (b)[(c)].buffer_type = (t); \ - (b)[(c)].buffer = (v); \ - (b)[(c)].is_unsigned = (s); \ - } while (0) - -#define MXQ_MYSQL_BIND_INT64(b, c, v) MXQ_MYSQL_BIND_INT((b), (c), (v), MYSQL_TYPE_LONGLONG, 0) -#define MXQ_MYSQL_BIND_UINT64(b, c, v) MXQ_MYSQL_BIND_INT((b), (c), (v), MYSQL_TYPE_LONGLONG, 1) - -#define MXQ_MYSQL_BIND_INT32(b, c, v) MXQ_MYSQL_BIND_INT((b), (c), (v), MYSQL_TYPE_LONG, 0) -#define MXQ_MYSQL_BIND_UINT32(b, c, v) MXQ_MYSQL_BIND_INT((b), (c), (v), MYSQL_TYPE_LONG, 1) - -#define MXQ_MYSQL_BIND_INT16(b, c, v) MXQ_MYSQL_BIND_INT((b), (c), (v), MYSQL_TYPE_SHORT, 0) -#define MXQ_MYSQL_BIND_UINT16(b, c, v) MXQ_MYSQL_BIND_INT((b), (c), (v), MYSQL_TYPE_SHORT, 1) - -#define MXQ_MYSQL_BIND_INT8(b, c, v) MXQ_MYSQL_BIND_INT((b), (c), (v), MYSQL_TYPE_TINY, 0) -#define MXQ_MYSQL_BIND_UINT8(b, c, v) MXQ_MYSQL_BIND_INT((b), (c), (v), MYSQL_TYPE_TINY, 1) - -#define MXQ_MYSQL_BIND_VARSTR(b, c, v) \ - do { \ - (b)[(c)].buffer_type = MYSQL_TYPE_STRING; \ - (b)[(c)].buffer = 0; \ - (b)[(c)].buffer_length = 0; \ - (b)[(c)].length = (v); \ - } while (0) - -#define MXQ_MYSQL_BIND_STRING(b, c, v) \ - do { \ - (b)[(c)].buffer_type = MYSQL_TYPE_STRING; \ - (b)[(c)].buffer = (v); \ - (b)[(c)].buffer_length = strlen((v)); \ - (b)[(c)].length = &((b)[(c)].buffer_length); \ - } while (0) - - -#define mxq_mysql_print_error(mysql) \ - mx_log_err("MySQL: ERROR %u (%s): %s", \ - mysql_errno(mysql), \ - mysql_sqlstate(mysql), \ - mysql_error(mysql)) - -#define mxq_mysql_stmt_print_error(stmt) \ - mx_log_err("MySQL: ERROR %u (%s): %s", \ - mysql_stmt_errno(stmt), \ - mysql_stmt_sqlstate(stmt), \ - mysql_stmt_error(stmt)) - -static inline void close_mysqlp(void *p) { - if (!p) - return; - mxq_mysql_close(*(void**) p); -} - -#define _cleanup_close_mysql_ _cleanup_(close_mysqlp) - -#endif diff --git a/mxqd.c b/mxqd.c index 0c705ca..3f9662e 100644 --- a/mxqd.c +++ b/mxqd.c @@ -32,7 +32,8 @@ #include "mxq_group.h" #include "mxq_job.h" -#include "mxq_mysql.h" +#include "mx_mysql.h" +#include "mxq_util.h" #include "mxqd.h" #ifndef MXQ_VERSION @@ -318,8 +319,13 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) memset(server, 0, sizeof(*server)); - server->mmysql.default_file = arg_mysql_default_file; - server->mmysql.default_group = arg_mysql_default_group; + res = mx_mysql_initialize(&(server->mysql)); + assert(res == 0); + + mx_mysql_option_set_default_file(server->mysql, arg_mysql_default_file); + mx_mysql_option_set_default_group(server->mysql, arg_mysql_default_group); + mx_mysql_option_set_reconnect(server->mysql, 1); + server->hostname = mxq_hostname(); server->server_id = arg_server_id; @@ -956,7 +962,7 @@ unsigned long start_job(struct mxq_group_list *group) server = group->user->server; - res = mxq_job_load(server->mysql, &mxqjob, group->group.group_id, server->hostname, server->server_id); + res = mxq_load_job_from_group_for_server(server->mysql, &mxqjob, group->group.group_id, server->hostname, server->server_id); if (!res) { return 0; @@ -964,7 +970,7 @@ unsigned long start_job(struct mxq_group_list *group) mx_log_info(" job=%s(%d):%lu:%lu :: new job loaded.", group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id); - mxq_mysql_close(server->mysql); + mx_mysql_disconnect(server->mysql); pid = fork(); if (pid < 0) { @@ -1018,14 +1024,17 @@ unsigned long start_job(struct mxq_group_list *group) gettimeofday(&mxqjob.stats_starttime, NULL); - server->mysql = mxq_mysql_connect(&server->mmysql); + mx_mysql_connect_forever(&(server->mysql)); mxqjob.host_pid = pid; mxqjob.host_slots = group->slots_per_job; - res = mxq_job_update_status_running(server->mysql, &mxqjob); - if (res <= 0) { - mx_log_err("mxq_job_update_status_running(): %m"); - } + res = mxq_set_job_status_running(server->mysql, &mxqjob); + if (res < 0) + mx_log_err("job=%s(%d):%lu:%lu mxq_job_update_status_running(): %m", + group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id); + if (res == 0) + mx_log_err("job=%s(%d):%lu:%lu mxq_job_update_status_running(): Job not found.", + group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id); do { job = group_add_job(group, &mxqjob); @@ -1148,7 +1157,6 @@ unsigned long start_users(struct mxq_server *server) unsigned long slots_started_total = 0; struct mxq_user_list *user, *unext=NULL; - struct mxq_group_list *group, *gnext; assert(server); @@ -1193,7 +1201,6 @@ int remove_orphaned_groups(struct mxq_server *server) { struct mxq_user_list *user, *unext, *uprev; struct mxq_group_list *group, *gnext, *gprev; - struct mxq_job_list *job; int cnt=0; for (user=server->users, uprev=NULL; user; user=unext) { @@ -1441,7 +1448,7 @@ int catchall(struct mxq_server *server) { mx_log_info(" job=%s(%d):%lu:%lu host_pid=%d stats_status=%d :: child process returned.", g->user_name, g->user_uid, g->group_id, j->job_id, pid, status); - mxq_job_update_status_exit(server->mysql, j); + mxq_set_job_status_exited(server->mysql, j); if (j->job_status == MXQ_JOB_STATUS_FINISHED) { g->group_jobs_finished++; @@ -1484,7 +1491,7 @@ int load_groups(struct mxq_server *server) { int total; int i; - group_cnt = mxq_group_load_active_groups(server->mysql, &mxqgroups); + group_cnt = mxq_load_active_groups(server->mysql, &mxqgroups); for (i=0, total=0; i