diff --git a/mxqkill.c b/mxqkill.c index 05890a3..2615866 100644 --- a/mxqkill.c +++ b/mxqkill.c @@ -42,6 +42,7 @@ static void print_usage(void) " %s [options]\n" " %s [options] --group-id=GROUPID\n" " %s [options] --job-id=JOBID\n" + " %s [options] --all\n" "\n" "Synopsis:\n" " kill/cancel jobs running in MXQ cluster\n" @@ -50,6 +51,7 @@ static void print_usage(void) "\n" " -g, --group-id=GROUPID cancel/kill group \n" " -J, --job-id=JOBID cancel job \n" + " -A, --all cancel all running and pending jobs\n" "\n" " -v, --verbose be more verbose\n" " --debug set debug log level (default: warning log level)\n" @@ -73,90 +75,37 @@ static void print_usage(void) program_invocation_short_name, program_invocation_short_name, program_invocation_short_name, + program_invocation_short_name, MXQ_MYSQL_DEFAULT_FILE_STR, MXQ_MYSQL_DEFAULT_GROUP_STR ); } -static int update_group_status_cancelled(struct mx_mysql *mysql, struct mxq_group *g) -{ - struct mx_mysql_stmt *stmt = NULL; - unsigned long long num_rows = 0; - int res; - - assert(g->group_id); - - stmt = mx_mysql_statement_prepare(mysql, - "UPDATE mxq_group SET" - " group_status = " status_str(MXQ_GROUP_STATUS_CANCELLED) - " WHERE group_id = ?" - " AND group_status = " status_str(MXQ_GROUP_STATUS_OK) - " AND user_uid = ?" - " AND group_jobs-group_jobs_finished-group_jobs_failed-group_jobs_cancelled-group_jobs_unknown > 0" - ); - if (!stmt) { - mx_log_err("mx_mysql_statement_prepare(): %s", mx_mysql_error()); - return -(errno=EIO); - } - - mx_mysql_statement_param_bind(stmt, 0, uint64, &(g->group_id)); - mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid)); - - res = mx_mysql_statement_execute(stmt, &num_rows); - - if (res < 0) - mx_log_err("mx_mysql_statement_execute(): %s", mx_mysql_error()); - - mx_mysql_statement_close(&stmt); - - if (res < 0) - return -(errno=-res); - - assert(num_rows <= 1); - return (int)num_rows; +static struct mx_mysql_stmt *mysql_prepare_or_die(struct mx_mysql *mysql, char *sql) { + struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, sql); + if (stmt == NULL) + mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); + return stmt; } -static int update_job_status_cancelled_by_group(struct mx_mysql *mysql, struct mxq_group *g) -{ - struct mx_mysql_stmt *stmt = NULL; - unsigned long long num_rows = 0; - int res; - - assert(g->group_id); - - stmt = mx_mysql_statement_prepare(mysql, - "UPDATE mxq_job SET job_cancelled = TRUE" - " WHERE group_id = ?" - " AND job_status = " status_str(MXQ_JOB_STATUS_INQ) - ); - if (!stmt) { - mx_log_err("mx_mysql_statement_prepare(): %s", mx_mysql_error()); - return -(errno=EIO); - } - - mx_mysql_statement_param_bind(stmt, 0, uint64, &(g->group_id)); - - res = mx_mysql_statement_execute(stmt, &num_rows); - - if (res < 0) - mx_log_err("mx_mysql_statement_execute(): %s", mx_mysql_error()); - +static unsigned long long int mysql_execute_or_die(struct mx_mysql_stmt *stmt) { + unsigned long long num_rows; + if (mx_mysql_statement_execute(stmt, &num_rows) < 0) + mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error()); mx_mysql_statement_close(&stmt); + return num_rows; +} - if (res < 0) - return -(errno=-res); - - return (int)num_rows; +static unsigned long long int mysql_prepare_and_execute_or_die(struct mx_mysql *mysql, char *sql) { + return mysql_execute_or_die(mysql_prepare_or_die(mysql, sql)); } static void verify_job_owner(struct mx_mysql *mysql, uint64_t job_id, uint64_t user_uid) { - struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, + struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql, "SELECT user_uid FROM mxq_job, mxq_group" " WHERE mxq_job.group_id = mxq_group.group_id" " AND job_id = ?" ); - if (!stmt) - mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); mx_mysql_statement_param_bind(stmt, 0, uint64, &job_id); @@ -182,35 +131,142 @@ static void verify_job_owner(struct mx_mysql *mysql, uint64_t job_id, uint64_t u } static void set_job_cancelled(struct mx_mysql *mysql, uint64_t job_id) { - struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, + struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql, "UPDATE mxq_job SET job_cancelled = TRUE" " WHERE job_id = ?" ); - if (!stmt) - mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error()); mx_mysql_statement_param_bind(stmt, 0, uint64, &job_id); + unsigned long long num_rows = mysql_execute_or_die(stmt); + if (num_rows == 0) + mx_die("no such job_id %lu\n", job_id); +} + +static void cancel_job(struct mx_mysql *mysql, uint64_t job_id, uint64_t user_uid) { + + verify_job_owner(mysql, job_id, user_uid); + set_job_cancelled(mysql, job_id); +} + +static void verify_group_owner(struct mx_mysql *mysql, uint64_t group_id, uint64_t user_uid) { + struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql, + "SELECT user_uid FROM mxq_group" + " WHERE group_id = ?" + ); + + mx_mysql_statement_param_bind(stmt, 0, uint64, &group_id); + unsigned long long num_rows; int res = mx_mysql_statement_execute(stmt, &num_rows); if (res < 0) mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error()); + if (num_rows == 0) - mx_die("no such job_id %lu\n", job_id); + mx_die("no such group_id %lu\n", group_id); + + uint64_t uid; + mx_mysql_statement_result_bind(stmt, 0, uint64, &uid); + + res = mx_mysql_statement_fetch(stmt); + if (res < 0) + mx_die("mx_mysql_statement_fetch: %s\n", mx_mysql_error()); + + if (uid != user_uid) + mx_die("group %lu: permission denied\n", group_id); mx_mysql_statement_close(&stmt); } -static void cancel_job(struct mx_mysql *mysql, uint64_t job_id, uint64_t user_uid) { +static void set_group_cancelled(struct mx_mysql *mysql, uint64_t group_id) { + struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql, + "UPDATE mxq_group SET group_status = " status_str(MXQ_GROUP_STATUS_CANCELLED) + " WHERE group_id = ?" + ); - verify_job_owner(mysql, job_id, user_uid); - set_job_cancelled(mysql, job_id); + mx_mysql_statement_param_bind(stmt, 0, uint64, &group_id); + + unsigned long long num_rows = mysql_execute_or_die(stmt); + if (num_rows == 0) + mx_die("no such group_id %lu\n", group_id); +} + +static void cancel_pending_jobs(struct mx_mysql *mysql, uint64_t group_id) { + struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql, + "UPDATE mxq_job SET job_cancelled = TRUE" + " WHERE group_id = ?" + " AND job_status = " status_str(MXQ_JOB_STATUS_INQ) + ); + + mx_mysql_statement_param_bind(stmt, 0, uint64, &group_id); + + unsigned long long num_rows = mysql_execute_or_die(stmt); + if (num_rows) + mx_log_notice("cancelled %llu jobs in group with group_id=%lu", num_rows, group_id); +} + +static void cancel_group(struct mx_mysql *mysql, uint64_t group_id, uint64_t user_uid) { + + verify_group_owner(mysql, group_id, user_uid); + set_group_cancelled(mysql, group_id); + cancel_pending_jobs(mysql, group_id); + mx_log_notice("marked all running jobs in group with group_id=%lu to be killed by executing servers.", group_id); + mx_log_notice("deactivated group with group_id=%lu", group_id); +} + +static void cancel_all_jobs(struct mx_mysql *mysql, uint64_t user_uid) { + + struct mx_mysql_stmt *stmt; + unsigned long long num_rows; + + /* we need to use a temporary table, because we can't select jobs on + * mxq_group.user_uid and implicitly modify mxq_group in the same + * statement. mxq_group is implicitly updated by the triggers when we + * update mxq_job. */ + + mysql_prepare_and_execute_or_die(mysql, + "CREATE TEMPORARY TABLE j (job_id INT8 UNSIGNED NOT NULL PRIMARY KEY)" + ); + + stmt = mysql_prepare_or_die(mysql, + "INSERT INTO j(job_id)" + " SELECT mxq_job.job_id FROM mxq_job,mxq_group" + " WHERE mxq_job.group_id = mxq_group.group_id" + " AND user_uid = ?" + " AND job_status IN (" + status_str(MXQ_JOB_STATUS_INQ) "," + status_str(MXQ_JOB_STATUS_ASSIGNED) "," + status_str(MXQ_JOB_STATUS_LOADED) "," + status_str(MXQ_JOB_STATUS_RUNNING) + " )" + ); + mx_mysql_statement_param_bind(stmt, 0, uint64, &user_uid); + num_rows = mysql_execute_or_die(stmt); + if (num_rows == 0) { + mx_log_notice("no running or pending jobs"); + return; + } + + num_rows = mysql_prepare_and_execute_or_die(mysql, + "UPDATE mxq_job" + " SET job_cancelled=TRUE" + " WHERE job_id in (SELECT * from j)" + " AND job_status IN (" + status_str(MXQ_JOB_STATUS_INQ) "," + status_str(MXQ_JOB_STATUS_ASSIGNED) "," + status_str(MXQ_JOB_STATUS_LOADED) "," + status_str(MXQ_JOB_STATUS_RUNNING) + " )" + ); + if (num_rows == 0) + mx_log_notice("no running or pending jobs"); + else + mx_log_notice("cancelled %llu jobs", num_rows); } int main(int argc, char *argv[]) { struct mx_mysql *mysql = NULL; - struct mxq_group group; uid_t ruid, euid, suid; struct passwd *passwd = NULL; @@ -238,6 +294,7 @@ int main(int argc, char *argv[]) MX_OPTION_REQUIRED_ARG("user", 'u'), MX_OPTION_REQUIRED_ARG("group-id", 'g'), MX_OPTION_REQUIRED_ARG("job-id", 'J'), + MX_OPTION_NO_ARG("all", 'A'), MX_OPTION_OPTIONAL_ARG("mysql-default-file", 'M'), MX_OPTION_OPTIONAL_ARG("mysql-default-group", 'S'), @@ -256,6 +313,7 @@ int main(int argc, char *argv[]) arg_job_id = 0; arg_debug = 0; arg_uid = UINT64_UNSET; + int opt_all = 0; mx_log_level_set(MX_LOG_NOTICE); @@ -337,6 +395,10 @@ int main(int argc, char *argv[]) } break; + case 'A': + opt_all = 1; + break; + case 'M': arg_mysql_default_file = optctl.optarg; break; @@ -349,7 +411,7 @@ int main(int argc, char *argv[]) MX_GETOPT_FINISH(optctl, argc, argv); - if (!arg_group_id && !arg_job_id) { + if (!arg_group_id && !arg_job_id && !opt_all) { print_usage(); exit(EX_USAGE); } @@ -388,57 +450,14 @@ int main(int argc, char *argv[]) mx_log_info("MySQL: Connection to database established."); - if (arg_job_id) { + if (arg_job_id) cancel_job(mysql, arg_job_id, passwd->pw_uid); - mx_mysql_finish(&mysql); - return 0; - } - - if (arg_group_id) { - memset(&group, 0, sizeof(group)); - - group.group_id = arg_group_id; - group.user_uid = passwd->pw_uid; - group.user_name = passwd->pw_name; - - res = update_group_status_cancelled(mysql, &group); - - if (res <= 0) { - mx_mysql_finish(&mysql); - mx_log_info("MySQL: Connection to database closed."); - - if (res == 0) - mx_log_warning("no active group with group_id=%lu found for user=%s(%d)", - group.group_id, group.user_name, group.user_uid); - else - mx_log_err("cancelling group failed"); - return 1; - } - - assert(res == 1); - - res = update_job_status_cancelled_by_group(mysql, &group); - mx_mysql_finish(&mysql); - mx_log_info("MySQL: Connection to database closed."); + if (arg_group_id) + cancel_group(mysql, arg_group_id, passwd->pw_uid); - if (res == -1 && errno == ENOENT) - res=0; - - if (res >= 0) { - if (res) - mx_log_notice("cancelled %d jobs in group with group_id=%lu", - res, group.group_id); - mx_log_notice("marked all running jobs in group with group_id=%lu to be killed by executing servers.", - group.group_id); - mx_log_notice("deactivated group with group_id=%lu", - group.group_id); - return 0; - } else { - mx_log_err("cancelling jobs failed"); - return 1; - } - } + if (opt_all) + cancel_all_jobs(mysql, passwd->pw_uid); mx_mysql_finish(&mysql); mx_log_info("MySQL: Connection to database closed.");