Skip to content

Commit

Permalink
mxqkill: Add option --all to cancel all active jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
donald committed Aug 22, 2024
1 parent 4319485 commit a0c8d94
Showing 1 changed file with 77 additions and 1 deletion.
78 changes: 77 additions & 1 deletion mxqkill.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ static void print_usage(void)
" %s [options]\n"
" %s [options] --group-id=GROUPID\n"
" %s [options] --job-id=JOBID\n"
" %s [options] --all\n"
"\n"
"Synopsis:\n"
" kill/cancel jobs running in MXQ cluster\n"
Expand All @@ -50,6 +51,7 @@ static void print_usage(void)
"\n"
" -g, --group-id=GROUPID cancel/kill group <GROUPID>\n"
" -J, --job-id=JOBID cancel job <JOBID>\n"
" -A, --all cancel all running and pending jobs\n"
"\n"
" -v, --verbose be more verbose\n"
" --debug set debug log level (default: warning log level)\n"
Expand All @@ -73,6 +75,7 @@ static void print_usage(void)
program_invocation_short_name,
program_invocation_short_name,
program_invocation_short_name,
program_invocation_short_name,
MXQ_MYSQL_DEFAULT_FILE_STR,
MXQ_MYSQL_DEFAULT_GROUP_STR
);
Expand Down Expand Up @@ -216,6 +219,70 @@ static void cancel_group(struct mx_mysql *mysql, uint64_t group_id, uint64_t use
mx_log_notice("deactivated group with group_id=%lu", group_id);
}

static void cancel_all_jobs(struct mx_mysql *mysql, uint64_t user_uid) {

struct mx_mysql_stmt *stmt;
unsigned long long num_rows;

/* we need to use a temporary table, because we can't select jobs on
* mxq_group.user_uid and implicitly modify mxq_group in the same
* statement. mxq_group is implicitly updated by the triggers when we
* update mxq_job. */

stmt = mx_mysql_statement_prepare(mysql,
"CREATE TEMPORARY TABLE j (job_id INT8 UNSIGNED NOT NULL PRIMARY KEY)"
);
if (!stmt)
mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error());
if (mx_mysql_statement_execute(stmt, NULL) < 0)
mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error());
mx_mysql_statement_close(&stmt);

stmt = mx_mysql_statement_prepare(mysql,
"INSERT INTO j(job_id)"
" SELECT mxq_job.job_id FROM mxq_job,mxq_group"
" WHERE mxq_job.group_id = mxq_group.group_id"
" AND user_uid = ?"
" AND job_status IN ("
status_str(MXQ_JOB_STATUS_INQ) ","
status_str(MXQ_JOB_STATUS_ASSIGNED) ","
status_str(MXQ_JOB_STATUS_LOADED) ","
status_str(MXQ_JOB_STATUS_RUNNING)
" )"
);
if (!stmt)
mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error());
mx_mysql_statement_param_bind(stmt, 0, uint64, &user_uid);
if (mx_mysql_statement_execute(stmt, &num_rows) < 0)
mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error());
mx_mysql_statement_close(&stmt);
if (num_rows == 0) {
mx_log_notice("no running or pending jobs");
return;
}
stmt = mx_mysql_statement_prepare(mysql,
"UPDATE mxq_job"
" SET job_cancelled=TRUE"
" WHERE job_id in (SELECT * from j)"
" AND job_status IN ("
status_str(MXQ_JOB_STATUS_INQ) ","
status_str(MXQ_JOB_STATUS_ASSIGNED) ","
status_str(MXQ_JOB_STATUS_LOADED) ","
status_str(MXQ_JOB_STATUS_RUNNING)
" )"
);
if (!stmt)
mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error());
if (mx_mysql_statement_execute(stmt, &num_rows) < 0)
mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error());
mx_mysql_statement_close(&stmt);

if (num_rows == 0)
mx_log_notice("no running or pending jobs");
else
mx_log_notice("cancelled %llu jobs", num_rows);
}

int main(int argc, char *argv[])
{
struct mx_mysql *mysql = NULL;
Expand Down Expand Up @@ -246,6 +313,7 @@ int main(int argc, char *argv[])
MX_OPTION_REQUIRED_ARG("user", 'u'),
MX_OPTION_REQUIRED_ARG("group-id", 'g'),
MX_OPTION_REQUIRED_ARG("job-id", 'J'),
MX_OPTION_NO_ARG("all", 'A'),

MX_OPTION_OPTIONAL_ARG("mysql-default-file", 'M'),
MX_OPTION_OPTIONAL_ARG("mysql-default-group", 'S'),
Expand All @@ -264,6 +332,7 @@ int main(int argc, char *argv[])
arg_job_id = 0;
arg_debug = 0;
arg_uid = UINT64_UNSET;
int opt_all = 0;

mx_log_level_set(MX_LOG_NOTICE);

Expand Down Expand Up @@ -345,6 +414,10 @@ int main(int argc, char *argv[])
}
break;

case 'A':
opt_all = 1;
break;

case 'M':
arg_mysql_default_file = optctl.optarg;
break;
Expand All @@ -357,7 +430,7 @@ int main(int argc, char *argv[])

MX_GETOPT_FINISH(optctl, argc, argv);

if (!arg_group_id && !arg_job_id) {
if (!arg_group_id && !arg_job_id && !opt_all) {
print_usage();
exit(EX_USAGE);
}
Expand Down Expand Up @@ -402,6 +475,9 @@ int main(int argc, char *argv[])
if (arg_group_id)
cancel_group(mysql, arg_group_id, passwd->pw_uid);

if (opt_all)
cancel_all_jobs(mysql, passwd->pw_uid);

mx_mysql_finish(&mysql);
mx_log_info("MySQL: Connection to database closed.");
return 1;
Expand Down

0 comments on commit a0c8d94

Please sign in to comment.