From b80317484adef66f671f37bcaa9762eabe79e99f Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Thu, 15 Aug 2024 10:55:43 +0200 Subject: [PATCH 1/4] mxqkill: Add function `cancel_group` and refactor group cancellation logic Refactor the group cancellation process by moving the code from main() into new functions, similar to the existing jobs cancellation functions. Added `cancel_group` function along with its helper functions. Notable change: - Removed constraints from the old code, making it no longer mandatory for the group to have `groups_status OK`, or pending/running jobs. This allows cancellations of both inactive and previously cancelled groups. These changes enable more flexible group management and simplify the cancellation logic. Note: The old code will be removed in the next commit for cleaner diffs and improved clarity. --- mxqkill.c | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/mxqkill.c b/mxqkill.c index 05890a3..1dc48d5 100644 --- a/mxqkill.c +++ b/mxqkill.c @@ -207,6 +207,86 @@ static void cancel_job(struct mx_mysql *mysql, uint64_t job_id, uint64_t user_ui set_job_cancelled(mysql, job_id); } +static void verify_group_owner(struct mx_mysql *mysql, uint64_t group_id, uint64_t user_uid) { + struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, + "SELECT user_uid FROM mxq_group" + " WHERE group_id = ?" + ); + if (!stmt) + mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); + + mx_mysql_statement_param_bind(stmt, 0, uint64, &group_id); + + unsigned long long num_rows; + int res = mx_mysql_statement_execute(stmt, &num_rows); + if (res < 0) + mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error()); + + if (num_rows == 0) + mx_die("no such group_id %lu\n", group_id); + + uint64_t uid; + mx_mysql_statement_result_bind(stmt, 0, uint64, &uid); + + res = mx_mysql_statement_fetch(stmt); + if (res < 0) + mx_die("mx_mysql_statement_fetch: %s\n", mx_mysql_error()); + + if (uid != user_uid) + mx_die("group %lu: permission denied\n", group_id); + + mx_mysql_statement_close(&stmt); +} + +static void set_group_cancelled(struct mx_mysql *mysql, uint64_t group_id) { + struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, + "UPDATE mxq_group SET group_status = " status_str(MXQ_GROUP_STATUS_CANCELLED) + " WHERE group_id = ?" + ); + if (!stmt) + mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); + + mx_mysql_statement_param_bind(stmt, 0, uint64, &group_id); + + unsigned long long num_rows; + int res = mx_mysql_statement_execute(stmt, &num_rows); + if (res < 0) + mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error()); + if (num_rows == 0) + mx_die("no such group_id %lu\n", group_id); + + mx_mysql_statement_close(&stmt); +} + +static void cancel_pending_jobs(struct mx_mysql *mysql, uint64_t group_id) { + struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, + "UPDATE mxq_job SET job_cancelled = TRUE" + " WHERE group_id = ?" + " AND job_status = " status_str(MXQ_JOB_STATUS_INQ) + ); + if (!stmt) + mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); + + mx_mysql_statement_param_bind(stmt, 0, uint64, &group_id); + + unsigned long long num_rows; + int res = mx_mysql_statement_execute(stmt, &num_rows); + if (res < 0) + mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error()); + if (num_rows) + mx_log_notice("cancelled %llu jobs in group with group_id=%lu", num_rows, group_id); + mx_mysql_statement_close(&stmt); +} + +void cancel_group(struct mx_mysql *mysql, uint64_t group_id, uint64_t user_uid) { + + verify_group_owner(mysql, group_id, user_uid); + set_group_cancelled(mysql, group_id); + cancel_pending_jobs(mysql, group_id); + mx_log_notice("marked all running jobs in group with group_id=%lu to be killed by executing servers.", group_id); + mx_log_notice("deactivated group with group_id=%lu", group_id); +} + int main(int argc, char *argv[]) { struct mx_mysql *mysql = NULL; From 431948571f30b35ac2ccae01a028c9fb4afaff76 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Thu, 15 Aug 2024 10:59:46 +0200 Subject: [PATCH 2/4] mxqkill: Use cancel_group Use cancel_group() and remove old code. --- mxqkill.c | 126 ++---------------------------------------------------- 1 file changed, 4 insertions(+), 122 deletions(-) diff --git a/mxqkill.c b/mxqkill.c index 1dc48d5..910afcb 100644 --- a/mxqkill.c +++ b/mxqkill.c @@ -78,77 +78,6 @@ static void print_usage(void) ); } -static int update_group_status_cancelled(struct mx_mysql *mysql, struct mxq_group *g) -{ - struct mx_mysql_stmt *stmt = NULL; - unsigned long long num_rows = 0; - int res; - - assert(g->group_id); - - stmt = mx_mysql_statement_prepare(mysql, - "UPDATE mxq_group SET" - " group_status = " status_str(MXQ_GROUP_STATUS_CANCELLED) - " WHERE group_id = ?" - " AND group_status = " status_str(MXQ_GROUP_STATUS_OK) - " AND user_uid = ?" - " AND group_jobs-group_jobs_finished-group_jobs_failed-group_jobs_cancelled-group_jobs_unknown > 0" - ); - if (!stmt) { - mx_log_err("mx_mysql_statement_prepare(): %s", mx_mysql_error()); - return -(errno=EIO); - } - - mx_mysql_statement_param_bind(stmt, 0, uint64, &(g->group_id)); - mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid)); - - res = mx_mysql_statement_execute(stmt, &num_rows); - - if (res < 0) - mx_log_err("mx_mysql_statement_execute(): %s", mx_mysql_error()); - - mx_mysql_statement_close(&stmt); - - if (res < 0) - return -(errno=-res); - - assert(num_rows <= 1); - return (int)num_rows; -} - -static int update_job_status_cancelled_by_group(struct mx_mysql *mysql, struct mxq_group *g) -{ - struct mx_mysql_stmt *stmt = NULL; - unsigned long long num_rows = 0; - int res; - - assert(g->group_id); - - stmt = mx_mysql_statement_prepare(mysql, - "UPDATE mxq_job SET job_cancelled = TRUE" - " WHERE group_id = ?" - " AND job_status = " status_str(MXQ_JOB_STATUS_INQ) - ); - if (!stmt) { - mx_log_err("mx_mysql_statement_prepare(): %s", mx_mysql_error()); - return -(errno=EIO); - } - - mx_mysql_statement_param_bind(stmt, 0, uint64, &(g->group_id)); - - res = mx_mysql_statement_execute(stmt, &num_rows); - - if (res < 0) - mx_log_err("mx_mysql_statement_execute(): %s", mx_mysql_error()); - - mx_mysql_statement_close(&stmt); - - if (res < 0) - return -(errno=-res); - - return (int)num_rows; -} - static void verify_job_owner(struct mx_mysql *mysql, uint64_t job_id, uint64_t user_uid) { struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, "SELECT user_uid FROM mxq_job, mxq_group" @@ -278,7 +207,7 @@ static void cancel_pending_jobs(struct mx_mysql *mysql, uint64_t group_id) { mx_mysql_statement_close(&stmt); } -void cancel_group(struct mx_mysql *mysql, uint64_t group_id, uint64_t user_uid) { +static void cancel_group(struct mx_mysql *mysql, uint64_t group_id, uint64_t user_uid) { verify_group_owner(mysql, group_id, user_uid); set_group_cancelled(mysql, group_id); @@ -290,7 +219,6 @@ void cancel_group(struct mx_mysql *mysql, uint64_t group_id, uint64_t user_uid) int main(int argc, char *argv[]) { struct mx_mysql *mysql = NULL; - struct mxq_group group; uid_t ruid, euid, suid; struct passwd *passwd = NULL; @@ -468,57 +396,11 @@ int main(int argc, char *argv[]) mx_log_info("MySQL: Connection to database established."); - if (arg_job_id) { + if (arg_job_id) cancel_job(mysql, arg_job_id, passwd->pw_uid); - mx_mysql_finish(&mysql); - return 0; - } - - if (arg_group_id) { - memset(&group, 0, sizeof(group)); - group.group_id = arg_group_id; - group.user_uid = passwd->pw_uid; - group.user_name = passwd->pw_name; - - res = update_group_status_cancelled(mysql, &group); - - if (res <= 0) { - mx_mysql_finish(&mysql); - mx_log_info("MySQL: Connection to database closed."); - - if (res == 0) - mx_log_warning("no active group with group_id=%lu found for user=%s(%d)", - group.group_id, group.user_name, group.user_uid); - else - mx_log_err("cancelling group failed"); - return 1; - } - - assert(res == 1); - - res = update_job_status_cancelled_by_group(mysql, &group); - - mx_mysql_finish(&mysql); - mx_log_info("MySQL: Connection to database closed."); - - if (res == -1 && errno == ENOENT) - res=0; - - if (res >= 0) { - if (res) - mx_log_notice("cancelled %d jobs in group with group_id=%lu", - res, group.group_id); - mx_log_notice("marked all running jobs in group with group_id=%lu to be killed by executing servers.", - group.group_id); - mx_log_notice("deactivated group with group_id=%lu", - group.group_id); - return 0; - } else { - mx_log_err("cancelling jobs failed"); - return 1; - } - } + if (arg_group_id) + cancel_group(mysql, arg_group_id, passwd->pw_uid); mx_mysql_finish(&mysql); mx_log_info("MySQL: Connection to database closed."); From a0c8d942d801ec12c673a430a8cc176143ad79ac Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Thu, 15 Aug 2024 21:33:32 +0200 Subject: [PATCH 3/4] mxqkill: Add option --all to cancel all active jobs --- mxqkill.c | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/mxqkill.c b/mxqkill.c index 910afcb..87225f4 100644 --- a/mxqkill.c +++ b/mxqkill.c @@ -42,6 +42,7 @@ static void print_usage(void) " %s [options]\n" " %s [options] --group-id=GROUPID\n" " %s [options] --job-id=JOBID\n" + " %s [options] --all\n" "\n" "Synopsis:\n" " kill/cancel jobs running in MXQ cluster\n" @@ -50,6 +51,7 @@ static void print_usage(void) "\n" " -g, --group-id=GROUPID cancel/kill group \n" " -J, --job-id=JOBID cancel job \n" + " -A, --all cancel all running and pending jobs\n" "\n" " -v, --verbose be more verbose\n" " --debug set debug log level (default: warning log level)\n" @@ -73,6 +75,7 @@ static void print_usage(void) program_invocation_short_name, program_invocation_short_name, program_invocation_short_name, + program_invocation_short_name, MXQ_MYSQL_DEFAULT_FILE_STR, MXQ_MYSQL_DEFAULT_GROUP_STR ); @@ -216,6 +219,70 @@ static void cancel_group(struct mx_mysql *mysql, uint64_t group_id, uint64_t use mx_log_notice("deactivated group with group_id=%lu", group_id); } +static void cancel_all_jobs(struct mx_mysql *mysql, uint64_t user_uid) { + + struct mx_mysql_stmt *stmt; + unsigned long long num_rows; + + /* we need to use a temporary table, because we can't select jobs on + * mxq_group.user_uid and implicitly modify mxq_group in the same + * statement. mxq_group is implicitly updated by the triggers when we + * update mxq_job. */ + + stmt = mx_mysql_statement_prepare(mysql, + "CREATE TEMPORARY TABLE j (job_id INT8 UNSIGNED NOT NULL PRIMARY KEY)" + ); + if (!stmt) + mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); + if (mx_mysql_statement_execute(stmt, NULL) < 0) + mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error()); + mx_mysql_statement_close(&stmt); + + stmt = mx_mysql_statement_prepare(mysql, + "INSERT INTO j(job_id)" + " SELECT mxq_job.job_id FROM mxq_job,mxq_group" + " WHERE mxq_job.group_id = mxq_group.group_id" + " AND user_uid = ?" + " AND job_status IN (" + status_str(MXQ_JOB_STATUS_INQ) "," + status_str(MXQ_JOB_STATUS_ASSIGNED) "," + status_str(MXQ_JOB_STATUS_LOADED) "," + status_str(MXQ_JOB_STATUS_RUNNING) + " )" + ); + if (!stmt) + mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); + mx_mysql_statement_param_bind(stmt, 0, uint64, &user_uid); + if (mx_mysql_statement_execute(stmt, &num_rows) < 0) + mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error()); + mx_mysql_statement_close(&stmt); + if (num_rows == 0) { + mx_log_notice("no running or pending jobs"); + return; + } + stmt = mx_mysql_statement_prepare(mysql, + "UPDATE mxq_job" + " SET job_cancelled=TRUE" + " WHERE job_id in (SELECT * from j)" + " AND job_status IN (" + status_str(MXQ_JOB_STATUS_INQ) "," + status_str(MXQ_JOB_STATUS_ASSIGNED) "," + status_str(MXQ_JOB_STATUS_LOADED) "," + status_str(MXQ_JOB_STATUS_RUNNING) + " )" + ); + if (!stmt) + mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); + if (mx_mysql_statement_execute(stmt, &num_rows) < 0) + mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error()); + mx_mysql_statement_close(&stmt); + + if (num_rows == 0) + mx_log_notice("no running or pending jobs"); + else + mx_log_notice("cancelled %llu jobs", num_rows); +} + int main(int argc, char *argv[]) { struct mx_mysql *mysql = NULL; @@ -246,6 +313,7 @@ int main(int argc, char *argv[]) MX_OPTION_REQUIRED_ARG("user", 'u'), MX_OPTION_REQUIRED_ARG("group-id", 'g'), MX_OPTION_REQUIRED_ARG("job-id", 'J'), + MX_OPTION_NO_ARG("all", 'A'), MX_OPTION_OPTIONAL_ARG("mysql-default-file", 'M'), MX_OPTION_OPTIONAL_ARG("mysql-default-group", 'S'), @@ -264,6 +332,7 @@ int main(int argc, char *argv[]) arg_job_id = 0; arg_debug = 0; arg_uid = UINT64_UNSET; + int opt_all = 0; mx_log_level_set(MX_LOG_NOTICE); @@ -345,6 +414,10 @@ int main(int argc, char *argv[]) } break; + case 'A': + opt_all = 1; + break; + case 'M': arg_mysql_default_file = optctl.optarg; break; @@ -357,7 +430,7 @@ int main(int argc, char *argv[]) MX_GETOPT_FINISH(optctl, argc, argv); - if (!arg_group_id && !arg_job_id) { + if (!arg_group_id && !arg_job_id && !opt_all) { print_usage(); exit(EX_USAGE); } @@ -402,6 +475,9 @@ int main(int argc, char *argv[]) if (arg_group_id) cancel_group(mysql, arg_group_id, passwd->pw_uid); + if (opt_all) + cancel_all_jobs(mysql, passwd->pw_uid); + mx_mysql_finish(&mysql); mx_log_info("MySQL: Connection to database closed."); return 1; From 782839751b27bb9954a18d1f24857d5b5ecf0628 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Sun, 18 Aug 2024 16:54:53 +0200 Subject: [PATCH 4/4] mxqkill: Factor out some common code --- mxqkill.c | 83 +++++++++++++++++++++---------------------------------- 1 file changed, 32 insertions(+), 51 deletions(-) diff --git a/mxqkill.c b/mxqkill.c index 87225f4..2615866 100644 --- a/mxqkill.c +++ b/mxqkill.c @@ -81,14 +81,31 @@ static void print_usage(void) ); } +static struct mx_mysql_stmt *mysql_prepare_or_die(struct mx_mysql *mysql, char *sql) { + struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, sql); + if (stmt == NULL) + mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); + return stmt; +} + +static unsigned long long int mysql_execute_or_die(struct mx_mysql_stmt *stmt) { + unsigned long long num_rows; + if (mx_mysql_statement_execute(stmt, &num_rows) < 0) + mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error()); + mx_mysql_statement_close(&stmt); + return num_rows; +} + +static unsigned long long int mysql_prepare_and_execute_or_die(struct mx_mysql *mysql, char *sql) { + return mysql_execute_or_die(mysql_prepare_or_die(mysql, sql)); +} + static void verify_job_owner(struct mx_mysql *mysql, uint64_t job_id, uint64_t user_uid) { - struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, + struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql, "SELECT user_uid FROM mxq_job, mxq_group" " WHERE mxq_job.group_id = mxq_group.group_id" " AND job_id = ?" ); - if (!stmt) - mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); mx_mysql_statement_param_bind(stmt, 0, uint64, &job_id); @@ -114,23 +131,16 @@ static void verify_job_owner(struct mx_mysql *mysql, uint64_t job_id, uint64_t u } static void set_job_cancelled(struct mx_mysql *mysql, uint64_t job_id) { - struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, + struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql, "UPDATE mxq_job SET job_cancelled = TRUE" " WHERE job_id = ?" ); - if (!stmt) - mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); mx_mysql_statement_param_bind(stmt, 0, uint64, &job_id); - unsigned long long num_rows; - int res = mx_mysql_statement_execute(stmt, &num_rows); - if (res < 0) - mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error()); + unsigned long long num_rows = mysql_execute_or_die(stmt); if (num_rows == 0) mx_die("no such job_id %lu\n", job_id); - - mx_mysql_statement_close(&stmt); } static void cancel_job(struct mx_mysql *mysql, uint64_t job_id, uint64_t user_uid) { @@ -140,12 +150,10 @@ static void cancel_job(struct mx_mysql *mysql, uint64_t job_id, uint64_t user_ui } static void verify_group_owner(struct mx_mysql *mysql, uint64_t group_id, uint64_t user_uid) { - struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, + struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql, "SELECT user_uid FROM mxq_group" " WHERE group_id = ?" ); - if (!stmt) - mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); mx_mysql_statement_param_bind(stmt, 0, uint64, &group_id); @@ -171,43 +179,30 @@ static void verify_group_owner(struct mx_mysql *mysql, uint64_t group_id, uint64 } static void set_group_cancelled(struct mx_mysql *mysql, uint64_t group_id) { - struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, + struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql, "UPDATE mxq_group SET group_status = " status_str(MXQ_GROUP_STATUS_CANCELLED) " WHERE group_id = ?" ); - if (!stmt) - mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); mx_mysql_statement_param_bind(stmt, 0, uint64, &group_id); - unsigned long long num_rows; - int res = mx_mysql_statement_execute(stmt, &num_rows); - if (res < 0) - mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error()); + unsigned long long num_rows = mysql_execute_or_die(stmt); if (num_rows == 0) mx_die("no such group_id %lu\n", group_id); - - mx_mysql_statement_close(&stmt); } static void cancel_pending_jobs(struct mx_mysql *mysql, uint64_t group_id) { - struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, + struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql, "UPDATE mxq_job SET job_cancelled = TRUE" " WHERE group_id = ?" " AND job_status = " status_str(MXQ_JOB_STATUS_INQ) ); - if (!stmt) - mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); mx_mysql_statement_param_bind(stmt, 0, uint64, &group_id); - unsigned long long num_rows; - int res = mx_mysql_statement_execute(stmt, &num_rows); - if (res < 0) - mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error()); + unsigned long long num_rows = mysql_execute_or_die(stmt); if (num_rows) mx_log_notice("cancelled %llu jobs in group with group_id=%lu", num_rows, group_id); - mx_mysql_statement_close(&stmt); } static void cancel_group(struct mx_mysql *mysql, uint64_t group_id, uint64_t user_uid) { @@ -229,16 +224,11 @@ static void cancel_all_jobs(struct mx_mysql *mysql, uint64_t user_uid) { * statement. mxq_group is implicitly updated by the triggers when we * update mxq_job. */ - stmt = mx_mysql_statement_prepare(mysql, + mysql_prepare_and_execute_or_die(mysql, "CREATE TEMPORARY TABLE j (job_id INT8 UNSIGNED NOT NULL PRIMARY KEY)" ); - if (!stmt) - mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); - if (mx_mysql_statement_execute(stmt, NULL) < 0) - mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error()); - mx_mysql_statement_close(&stmt); - stmt = mx_mysql_statement_prepare(mysql, + stmt = mysql_prepare_or_die(mysql, "INSERT INTO j(job_id)" " SELECT mxq_job.job_id FROM mxq_job,mxq_group" " WHERE mxq_job.group_id = mxq_group.group_id" @@ -250,17 +240,14 @@ static void cancel_all_jobs(struct mx_mysql *mysql, uint64_t user_uid) { status_str(MXQ_JOB_STATUS_RUNNING) " )" ); - if (!stmt) - mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); mx_mysql_statement_param_bind(stmt, 0, uint64, &user_uid); - if (mx_mysql_statement_execute(stmt, &num_rows) < 0) - mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error()); - mx_mysql_statement_close(&stmt); + num_rows = mysql_execute_or_die(stmt); if (num_rows == 0) { mx_log_notice("no running or pending jobs"); return; } - stmt = mx_mysql_statement_prepare(mysql, + + num_rows = mysql_prepare_and_execute_or_die(mysql, "UPDATE mxq_job" " SET job_cancelled=TRUE" " WHERE job_id in (SELECT * from j)" @@ -271,12 +258,6 @@ static void cancel_all_jobs(struct mx_mysql *mysql, uint64_t user_uid) { status_str(MXQ_JOB_STATUS_RUNNING) " )" ); - if (!stmt) - mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); - if (mx_mysql_statement_execute(stmt, &num_rows) < 0) - mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error()); - mx_mysql_statement_close(&stmt); - if (num_rows == 0) mx_log_notice("no running or pending jobs"); else