Skip to content

Update mxqkill #160

Merged
merged 4 commits into from
Aug 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
269 changes: 144 additions & 125 deletions mxqkill.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ static void print_usage(void)
" %s [options]\n"
" %s [options] --group-id=GROUPID\n"
" %s [options] --job-id=JOBID\n"
" %s [options] --all\n"
"\n"
"Synopsis:\n"
" kill/cancel jobs running in MXQ cluster\n"
Expand All @@ -50,6 +51,7 @@ static void print_usage(void)
"\n"
" -g, --group-id=GROUPID cancel/kill group <GROUPID>\n"
" -J, --job-id=JOBID cancel job <JOBID>\n"
" -A, --all cancel all running and pending jobs\n"
"\n"
" -v, --verbose be more verbose\n"
" --debug set debug log level (default: warning log level)\n"
Expand All @@ -73,90 +75,37 @@ static void print_usage(void)
program_invocation_short_name,
program_invocation_short_name,
program_invocation_short_name,
program_invocation_short_name,
MXQ_MYSQL_DEFAULT_FILE_STR,
MXQ_MYSQL_DEFAULT_GROUP_STR
);
}

static int update_group_status_cancelled(struct mx_mysql *mysql, struct mxq_group *g)
{
struct mx_mysql_stmt *stmt = NULL;
unsigned long long num_rows = 0;
int res;

assert(g->group_id);

stmt = mx_mysql_statement_prepare(mysql,
"UPDATE mxq_group SET"
" group_status = " status_str(MXQ_GROUP_STATUS_CANCELLED)
" WHERE group_id = ?"
" AND group_status = " status_str(MXQ_GROUP_STATUS_OK)
" AND user_uid = ?"
" AND group_jobs-group_jobs_finished-group_jobs_failed-group_jobs_cancelled-group_jobs_unknown > 0"
);
if (!stmt) {
mx_log_err("mx_mysql_statement_prepare(): %s", mx_mysql_error());
return -(errno=EIO);
}

mx_mysql_statement_param_bind(stmt, 0, uint64, &(g->group_id));
mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid));

res = mx_mysql_statement_execute(stmt, &num_rows);

if (res < 0)
mx_log_err("mx_mysql_statement_execute(): %s", mx_mysql_error());

mx_mysql_statement_close(&stmt);

if (res < 0)
return -(errno=-res);

assert(num_rows <= 1);
return (int)num_rows;
static struct mx_mysql_stmt *mysql_prepare_or_die(struct mx_mysql *mysql, char *sql) {
struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, sql);
if (stmt == NULL)
mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error());
return stmt;
}

static int update_job_status_cancelled_by_group(struct mx_mysql *mysql, struct mxq_group *g)
{
struct mx_mysql_stmt *stmt = NULL;
unsigned long long num_rows = 0;
int res;

assert(g->group_id);

stmt = mx_mysql_statement_prepare(mysql,
"UPDATE mxq_job SET job_cancelled = TRUE"
" WHERE group_id = ?"
" AND job_status = " status_str(MXQ_JOB_STATUS_INQ)
);
if (!stmt) {
mx_log_err("mx_mysql_statement_prepare(): %s", mx_mysql_error());
return -(errno=EIO);
}

mx_mysql_statement_param_bind(stmt, 0, uint64, &(g->group_id));

res = mx_mysql_statement_execute(stmt, &num_rows);

if (res < 0)
mx_log_err("mx_mysql_statement_execute(): %s", mx_mysql_error());

static unsigned long long int mysql_execute_or_die(struct mx_mysql_stmt *stmt) {
unsigned long long num_rows;
if (mx_mysql_statement_execute(stmt, &num_rows) < 0)
mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error());
mx_mysql_statement_close(&stmt);
return num_rows;
}

if (res < 0)
return -(errno=-res);

return (int)num_rows;
static unsigned long long int mysql_prepare_and_execute_or_die(struct mx_mysql *mysql, char *sql) {
return mysql_execute_or_die(mysql_prepare_or_die(mysql, sql));
}

static void verify_job_owner(struct mx_mysql *mysql, uint64_t job_id, uint64_t user_uid) {
struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql,
struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql,
"SELECT user_uid FROM mxq_job, mxq_group"
" WHERE mxq_job.group_id = mxq_group.group_id"
" AND job_id = ?"
);
if (!stmt)
mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error());

mx_mysql_statement_param_bind(stmt, 0, uint64, &job_id);

Expand All @@ -182,35 +131,142 @@ static void verify_job_owner(struct mx_mysql *mysql, uint64_t job_id, uint64_t u
}

static void set_job_cancelled(struct mx_mysql *mysql, uint64_t job_id) {
struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql,
struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql,
"UPDATE mxq_job SET job_cancelled = TRUE"
" WHERE job_id = ?"
);
if (!stmt)
mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error());

mx_mysql_statement_param_bind(stmt, 0, uint64, &job_id);

unsigned long long num_rows = mysql_execute_or_die(stmt);
if (num_rows == 0)
mx_die("no such job_id %lu\n", job_id);
}

static void cancel_job(struct mx_mysql *mysql, uint64_t job_id, uint64_t user_uid) {

verify_job_owner(mysql, job_id, user_uid);
set_job_cancelled(mysql, job_id);
}

static void verify_group_owner(struct mx_mysql *mysql, uint64_t group_id, uint64_t user_uid) {
struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql,
"SELECT user_uid FROM mxq_group"
" WHERE group_id = ?"
);

mx_mysql_statement_param_bind(stmt, 0, uint64, &group_id);

unsigned long long num_rows;
int res = mx_mysql_statement_execute(stmt, &num_rows);
if (res < 0)
mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error());

if (num_rows == 0)
mx_die("no such job_id %lu\n", job_id);
mx_die("no such group_id %lu\n", group_id);

uint64_t uid;
mx_mysql_statement_result_bind(stmt, 0, uint64, &uid);

res = mx_mysql_statement_fetch(stmt);
if (res < 0)
mx_die("mx_mysql_statement_fetch: %s\n", mx_mysql_error());

if (uid != user_uid)
mx_die("group %lu: permission denied\n", group_id);

mx_mysql_statement_close(&stmt);
}

static void cancel_job(struct mx_mysql *mysql, uint64_t job_id, uint64_t user_uid) {
static void set_group_cancelled(struct mx_mysql *mysql, uint64_t group_id) {
struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql,
"UPDATE mxq_group SET group_status = " status_str(MXQ_GROUP_STATUS_CANCELLED)
" WHERE group_id = ?"
);

verify_job_owner(mysql, job_id, user_uid);
set_job_cancelled(mysql, job_id);
mx_mysql_statement_param_bind(stmt, 0, uint64, &group_id);

unsigned long long num_rows = mysql_execute_or_die(stmt);
if (num_rows == 0)
mx_die("no such group_id %lu\n", group_id);
}

static void cancel_pending_jobs(struct mx_mysql *mysql, uint64_t group_id) {
struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql,
"UPDATE mxq_job SET job_cancelled = TRUE"
" WHERE group_id = ?"
" AND job_status = " status_str(MXQ_JOB_STATUS_INQ)
);

mx_mysql_statement_param_bind(stmt, 0, uint64, &group_id);

unsigned long long num_rows = mysql_execute_or_die(stmt);
if (num_rows)
mx_log_notice("cancelled %llu jobs in group with group_id=%lu", num_rows, group_id);
}

static void cancel_group(struct mx_mysql *mysql, uint64_t group_id, uint64_t user_uid) {

verify_group_owner(mysql, group_id, user_uid);
set_group_cancelled(mysql, group_id);
cancel_pending_jobs(mysql, group_id);
mx_log_notice("marked all running jobs in group with group_id=%lu to be killed by executing servers.", group_id);
mx_log_notice("deactivated group with group_id=%lu", group_id);
}

static void cancel_all_jobs(struct mx_mysql *mysql, uint64_t user_uid) {

struct mx_mysql_stmt *stmt;
unsigned long long num_rows;

/* we need to use a temporary table, because we can't select jobs on
* mxq_group.user_uid and implicitly modify mxq_group in the same
* statement. mxq_group is implicitly updated by the triggers when we
* update mxq_job. */

mysql_prepare_and_execute_or_die(mysql,
"CREATE TEMPORARY TABLE j (job_id INT8 UNSIGNED NOT NULL PRIMARY KEY)"
);

stmt = mysql_prepare_or_die(mysql,
"INSERT INTO j(job_id)"
" SELECT mxq_job.job_id FROM mxq_job,mxq_group"
" WHERE mxq_job.group_id = mxq_group.group_id"
" AND user_uid = ?"
" AND job_status IN ("
status_str(MXQ_JOB_STATUS_INQ) ","
status_str(MXQ_JOB_STATUS_ASSIGNED) ","
status_str(MXQ_JOB_STATUS_LOADED) ","
status_str(MXQ_JOB_STATUS_RUNNING)
" )"
);
mx_mysql_statement_param_bind(stmt, 0, uint64, &user_uid);
num_rows = mysql_execute_or_die(stmt);
if (num_rows == 0) {
mx_log_notice("no running or pending jobs");
return;
}

num_rows = mysql_prepare_and_execute_or_die(mysql,
"UPDATE mxq_job"
" SET job_cancelled=TRUE"
" WHERE job_id in (SELECT * from j)"
" AND job_status IN ("
status_str(MXQ_JOB_STATUS_INQ) ","
status_str(MXQ_JOB_STATUS_ASSIGNED) ","
status_str(MXQ_JOB_STATUS_LOADED) ","
status_str(MXQ_JOB_STATUS_RUNNING)
" )"
);
if (num_rows == 0)
mx_log_notice("no running or pending jobs");
else
mx_log_notice("cancelled %llu jobs", num_rows);
}

int main(int argc, char *argv[])
{
struct mx_mysql *mysql = NULL;
struct mxq_group group;

uid_t ruid, euid, suid;
struct passwd *passwd = NULL;
Expand Down Expand Up @@ -238,6 +294,7 @@ int main(int argc, char *argv[])
MX_OPTION_REQUIRED_ARG("user", 'u'),
MX_OPTION_REQUIRED_ARG("group-id", 'g'),
MX_OPTION_REQUIRED_ARG("job-id", 'J'),
MX_OPTION_NO_ARG("all", 'A'),

MX_OPTION_OPTIONAL_ARG("mysql-default-file", 'M'),
MX_OPTION_OPTIONAL_ARG("mysql-default-group", 'S'),
Expand All @@ -256,6 +313,7 @@ int main(int argc, char *argv[])
arg_job_id = 0;
arg_debug = 0;
arg_uid = UINT64_UNSET;
int opt_all = 0;

mx_log_level_set(MX_LOG_NOTICE);

Expand Down Expand Up @@ -337,6 +395,10 @@ int main(int argc, char *argv[])
}
break;

case 'A':
opt_all = 1;
break;

case 'M':
arg_mysql_default_file = optctl.optarg;
break;
Expand All @@ -349,7 +411,7 @@ int main(int argc, char *argv[])

MX_GETOPT_FINISH(optctl, argc, argv);

if (!arg_group_id && !arg_job_id) {
if (!arg_group_id && !arg_job_id && !opt_all) {
print_usage();
exit(EX_USAGE);
}
Expand Down Expand Up @@ -388,57 +450,14 @@ int main(int argc, char *argv[])

mx_log_info("MySQL: Connection to database established.");

if (arg_job_id) {
if (arg_job_id)
cancel_job(mysql, arg_job_id, passwd->pw_uid);
mx_mysql_finish(&mysql);
return 0;
}

if (arg_group_id) {
memset(&group, 0, sizeof(group));

group.group_id = arg_group_id;
group.user_uid = passwd->pw_uid;
group.user_name = passwd->pw_name;

res = update_group_status_cancelled(mysql, &group);

if (res <= 0) {
mx_mysql_finish(&mysql);
mx_log_info("MySQL: Connection to database closed.");

if (res == 0)
mx_log_warning("no active group with group_id=%lu found for user=%s(%d)",
group.group_id, group.user_name, group.user_uid);
else
mx_log_err("cancelling group failed");
return 1;
}

assert(res == 1);

res = update_job_status_cancelled_by_group(mysql, &group);

mx_mysql_finish(&mysql);
mx_log_info("MySQL: Connection to database closed.");
if (arg_group_id)
cancel_group(mysql, arg_group_id, passwd->pw_uid);

if (res == -1 && errno == ENOENT)
res=0;

if (res >= 0) {
if (res)
mx_log_notice("cancelled %d jobs in group with group_id=%lu",
res, group.group_id);
mx_log_notice("marked all running jobs in group with group_id=%lu to be killed by executing servers.",
group.group_id);
mx_log_notice("deactivated group with group_id=%lu",
group.group_id);
return 0;
} else {
mx_log_err("cancelling jobs failed");
return 1;
}
}
if (opt_all)
cancel_all_jobs(mysql, passwd->pw_uid);

mx_mysql_finish(&mysql);
mx_log_info("MySQL: Connection to database closed.");
Expand Down