Skip to content

Commit

Permalink
Merge pull request #160 from mariux64/update-mxqkill
Browse files Browse the repository at this point in the history
Update mxqkill
  • Loading branch information
donald authored Aug 22, 2024
2 parents 71db964 + 7828397 commit d535f81
Showing 1 changed file with 144 additions and 125 deletions.
269 changes: 144 additions & 125 deletions mxqkill.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ static void print_usage(void)
" %s [options]\n"
" %s [options] --group-id=GROUPID\n"
" %s [options] --job-id=JOBID\n"
" %s [options] --all\n"
"\n"
"Synopsis:\n"
" kill/cancel jobs running in MXQ cluster\n"
Expand All @@ -50,6 +51,7 @@ static void print_usage(void)
"\n"
" -g, --group-id=GROUPID cancel/kill group <GROUPID>\n"
" -J, --job-id=JOBID cancel job <JOBID>\n"
" -A, --all cancel all running and pending jobs\n"
"\n"
" -v, --verbose be more verbose\n"
" --debug set debug log level (default: warning log level)\n"
Expand All @@ -73,90 +75,37 @@ static void print_usage(void)
program_invocation_short_name,
program_invocation_short_name,
program_invocation_short_name,
program_invocation_short_name,
MXQ_MYSQL_DEFAULT_FILE_STR,
MXQ_MYSQL_DEFAULT_GROUP_STR
);
}

static int update_group_status_cancelled(struct mx_mysql *mysql, struct mxq_group *g)
{
struct mx_mysql_stmt *stmt = NULL;
unsigned long long num_rows = 0;
int res;

assert(g->group_id);

stmt = mx_mysql_statement_prepare(mysql,
"UPDATE mxq_group SET"
" group_status = " status_str(MXQ_GROUP_STATUS_CANCELLED)
" WHERE group_id = ?"
" AND group_status = " status_str(MXQ_GROUP_STATUS_OK)
" AND user_uid = ?"
" AND group_jobs-group_jobs_finished-group_jobs_failed-group_jobs_cancelled-group_jobs_unknown > 0"
);
if (!stmt) {
mx_log_err("mx_mysql_statement_prepare(): %s", mx_mysql_error());
return -(errno=EIO);
}

mx_mysql_statement_param_bind(stmt, 0, uint64, &(g->group_id));
mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid));

res = mx_mysql_statement_execute(stmt, &num_rows);

if (res < 0)
mx_log_err("mx_mysql_statement_execute(): %s", mx_mysql_error());

mx_mysql_statement_close(&stmt);

if (res < 0)
return -(errno=-res);

assert(num_rows <= 1);
return (int)num_rows;
static struct mx_mysql_stmt *mysql_prepare_or_die(struct mx_mysql *mysql, char *sql) {
struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, sql);
if (stmt == NULL)
mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error());
return stmt;
}

static int update_job_status_cancelled_by_group(struct mx_mysql *mysql, struct mxq_group *g)
{
struct mx_mysql_stmt *stmt = NULL;
unsigned long long num_rows = 0;
int res;

assert(g->group_id);

stmt = mx_mysql_statement_prepare(mysql,
"UPDATE mxq_job SET job_cancelled = TRUE"
" WHERE group_id = ?"
" AND job_status = " status_str(MXQ_JOB_STATUS_INQ)
);
if (!stmt) {
mx_log_err("mx_mysql_statement_prepare(): %s", mx_mysql_error());
return -(errno=EIO);
}

mx_mysql_statement_param_bind(stmt, 0, uint64, &(g->group_id));

res = mx_mysql_statement_execute(stmt, &num_rows);

if (res < 0)
mx_log_err("mx_mysql_statement_execute(): %s", mx_mysql_error());

static unsigned long long int mysql_execute_or_die(struct mx_mysql_stmt *stmt) {
unsigned long long num_rows;
if (mx_mysql_statement_execute(stmt, &num_rows) < 0)
mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error());
mx_mysql_statement_close(&stmt);
return num_rows;
}

if (res < 0)
return -(errno=-res);

return (int)num_rows;
static unsigned long long int mysql_prepare_and_execute_or_die(struct mx_mysql *mysql, char *sql) {
return mysql_execute_or_die(mysql_prepare_or_die(mysql, sql));
}

static void verify_job_owner(struct mx_mysql *mysql, uint64_t job_id, uint64_t user_uid) {
struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql,
struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql,
"SELECT user_uid FROM mxq_job, mxq_group"
" WHERE mxq_job.group_id = mxq_group.group_id"
" AND job_id = ?"
);
if (!stmt)
mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error());

mx_mysql_statement_param_bind(stmt, 0, uint64, &job_id);

Expand All @@ -182,35 +131,142 @@ static void verify_job_owner(struct mx_mysql *mysql, uint64_t job_id, uint64_t u
}

static void set_job_cancelled(struct mx_mysql *mysql, uint64_t job_id) {
struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql,
struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql,
"UPDATE mxq_job SET job_cancelled = TRUE"
" WHERE job_id = ?"
);
if (!stmt)
mx_die("mx_mysql_statement_prepare(): %s\n", mx_mysql_error());

mx_mysql_statement_param_bind(stmt, 0, uint64, &job_id);

unsigned long long num_rows = mysql_execute_or_die(stmt);
if (num_rows == 0)
mx_die("no such job_id %lu\n", job_id);
}

static void cancel_job(struct mx_mysql *mysql, uint64_t job_id, uint64_t user_uid) {

verify_job_owner(mysql, job_id, user_uid);
set_job_cancelled(mysql, job_id);
}

static void verify_group_owner(struct mx_mysql *mysql, uint64_t group_id, uint64_t user_uid) {
struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql,
"SELECT user_uid FROM mxq_group"
" WHERE group_id = ?"
);

mx_mysql_statement_param_bind(stmt, 0, uint64, &group_id);

unsigned long long num_rows;
int res = mx_mysql_statement_execute(stmt, &num_rows);
if (res < 0)
mx_die("mx_mysql_statement_execute(): %s\n", mx_mysql_error());

if (num_rows == 0)
mx_die("no such job_id %lu\n", job_id);
mx_die("no such group_id %lu\n", group_id);

uint64_t uid;
mx_mysql_statement_result_bind(stmt, 0, uint64, &uid);

res = mx_mysql_statement_fetch(stmt);
if (res < 0)
mx_die("mx_mysql_statement_fetch: %s\n", mx_mysql_error());

if (uid != user_uid)
mx_die("group %lu: permission denied\n", group_id);

mx_mysql_statement_close(&stmt);
}

static void cancel_job(struct mx_mysql *mysql, uint64_t job_id, uint64_t user_uid) {
static void set_group_cancelled(struct mx_mysql *mysql, uint64_t group_id) {
struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql,
"UPDATE mxq_group SET group_status = " status_str(MXQ_GROUP_STATUS_CANCELLED)
" WHERE group_id = ?"
);

verify_job_owner(mysql, job_id, user_uid);
set_job_cancelled(mysql, job_id);
mx_mysql_statement_param_bind(stmt, 0, uint64, &group_id);

unsigned long long num_rows = mysql_execute_or_die(stmt);
if (num_rows == 0)
mx_die("no such group_id %lu\n", group_id);
}

static void cancel_pending_jobs(struct mx_mysql *mysql, uint64_t group_id) {
struct mx_mysql_stmt *stmt = mysql_prepare_or_die(mysql,
"UPDATE mxq_job SET job_cancelled = TRUE"
" WHERE group_id = ?"
" AND job_status = " status_str(MXQ_JOB_STATUS_INQ)
);

mx_mysql_statement_param_bind(stmt, 0, uint64, &group_id);

unsigned long long num_rows = mysql_execute_or_die(stmt);
if (num_rows)
mx_log_notice("cancelled %llu jobs in group with group_id=%lu", num_rows, group_id);
}

static void cancel_group(struct mx_mysql *mysql, uint64_t group_id, uint64_t user_uid) {

verify_group_owner(mysql, group_id, user_uid);
set_group_cancelled(mysql, group_id);
cancel_pending_jobs(mysql, group_id);
mx_log_notice("marked all running jobs in group with group_id=%lu to be killed by executing servers.", group_id);
mx_log_notice("deactivated group with group_id=%lu", group_id);
}

static void cancel_all_jobs(struct mx_mysql *mysql, uint64_t user_uid) {

struct mx_mysql_stmt *stmt;
unsigned long long num_rows;

/* we need to use a temporary table, because we can't select jobs on
* mxq_group.user_uid and implicitly modify mxq_group in the same
* statement. mxq_group is implicitly updated by the triggers when we
* update mxq_job. */

mysql_prepare_and_execute_or_die(mysql,
"CREATE TEMPORARY TABLE j (job_id INT8 UNSIGNED NOT NULL PRIMARY KEY)"
);

stmt = mysql_prepare_or_die(mysql,
"INSERT INTO j(job_id)"
" SELECT mxq_job.job_id FROM mxq_job,mxq_group"
" WHERE mxq_job.group_id = mxq_group.group_id"
" AND user_uid = ?"
" AND job_status IN ("
status_str(MXQ_JOB_STATUS_INQ) ","
status_str(MXQ_JOB_STATUS_ASSIGNED) ","
status_str(MXQ_JOB_STATUS_LOADED) ","
status_str(MXQ_JOB_STATUS_RUNNING)
" )"
);
mx_mysql_statement_param_bind(stmt, 0, uint64, &user_uid);
num_rows = mysql_execute_or_die(stmt);
if (num_rows == 0) {
mx_log_notice("no running or pending jobs");
return;
}

num_rows = mysql_prepare_and_execute_or_die(mysql,
"UPDATE mxq_job"
" SET job_cancelled=TRUE"
" WHERE job_id in (SELECT * from j)"
" AND job_status IN ("
status_str(MXQ_JOB_STATUS_INQ) ","
status_str(MXQ_JOB_STATUS_ASSIGNED) ","
status_str(MXQ_JOB_STATUS_LOADED) ","
status_str(MXQ_JOB_STATUS_RUNNING)
" )"
);
if (num_rows == 0)
mx_log_notice("no running or pending jobs");
else
mx_log_notice("cancelled %llu jobs", num_rows);
}

int main(int argc, char *argv[])
{
struct mx_mysql *mysql = NULL;
struct mxq_group group;

uid_t ruid, euid, suid;
struct passwd *passwd = NULL;
Expand Down Expand Up @@ -238,6 +294,7 @@ int main(int argc, char *argv[])
MX_OPTION_REQUIRED_ARG("user", 'u'),
MX_OPTION_REQUIRED_ARG("group-id", 'g'),
MX_OPTION_REQUIRED_ARG("job-id", 'J'),
MX_OPTION_NO_ARG("all", 'A'),

MX_OPTION_OPTIONAL_ARG("mysql-default-file", 'M'),
MX_OPTION_OPTIONAL_ARG("mysql-default-group", 'S'),
Expand All @@ -256,6 +313,7 @@ int main(int argc, char *argv[])
arg_job_id = 0;
arg_debug = 0;
arg_uid = UINT64_UNSET;
int opt_all = 0;

mx_log_level_set(MX_LOG_NOTICE);

Expand Down Expand Up @@ -337,6 +395,10 @@ int main(int argc, char *argv[])
}
break;

case 'A':
opt_all = 1;
break;

case 'M':
arg_mysql_default_file = optctl.optarg;
break;
Expand All @@ -349,7 +411,7 @@ int main(int argc, char *argv[])

MX_GETOPT_FINISH(optctl, argc, argv);

if (!arg_group_id && !arg_job_id) {
if (!arg_group_id && !arg_job_id && !opt_all) {
print_usage();
exit(EX_USAGE);
}
Expand Down Expand Up @@ -388,57 +450,14 @@ int main(int argc, char *argv[])

mx_log_info("MySQL: Connection to database established.");

if (arg_job_id) {
if (arg_job_id)
cancel_job(mysql, arg_job_id, passwd->pw_uid);
mx_mysql_finish(&mysql);
return 0;
}

if (arg_group_id) {
memset(&group, 0, sizeof(group));

group.group_id = arg_group_id;
group.user_uid = passwd->pw_uid;
group.user_name = passwd->pw_name;

res = update_group_status_cancelled(mysql, &group);

if (res <= 0) {
mx_mysql_finish(&mysql);
mx_log_info("MySQL: Connection to database closed.");

if (res == 0)
mx_log_warning("no active group with group_id=%lu found for user=%s(%d)",
group.group_id, group.user_name, group.user_uid);
else
mx_log_err("cancelling group failed");
return 1;
}

assert(res == 1);

res = update_job_status_cancelled_by_group(mysql, &group);

mx_mysql_finish(&mysql);
mx_log_info("MySQL: Connection to database closed.");
if (arg_group_id)
cancel_group(mysql, arg_group_id, passwd->pw_uid);

if (res == -1 && errno == ENOENT)
res=0;

if (res >= 0) {
if (res)
mx_log_notice("cancelled %d jobs in group with group_id=%lu",
res, group.group_id);
mx_log_notice("marked all running jobs in group with group_id=%lu to be killed by executing servers.",
group.group_id);
mx_log_notice("deactivated group with group_id=%lu",
group.group_id);
return 0;
} else {
mx_log_err("cancelling jobs failed");
return 1;
}
}
if (opt_all)
cancel_all_jobs(mysql, passwd->pw_uid);

mx_mysql_finish(&mysql);
mx_log_info("MySQL: Connection to database closed.");
Expand Down

0 comments on commit d535f81

Please sign in to comment.