Skip to content

Commit

Permalink
mxqd: Kill running jobs which were cancelled
Browse files Browse the repository at this point in the history
Periodically check the job_canncelled flags for the jobs run by this
daemon. If a cancellation of the job was requested, kill it, using the
existing mechanism.
  • Loading branch information
donald committed Dec 28, 2023
1 parent 0e29847 commit f7e2345
Showing 1 changed file with 55 additions and 0 deletions.
55 changes: 55 additions & 0 deletions mxqd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1809,6 +1809,60 @@ static int killall_cancelled(struct ppidcache *ppidcache, struct mxq_server *ser
return 0;
}

static void kill_by_jobid(struct ppidcache *ppidcache, struct mxq_server *server, unsigned long job_id) {
for (struct mxq_user_list *ulist = server->users ; ulist ; ulist = ulist->next)
for (struct mxq_group_list *glist = ulist->groups ; glist ; glist = glist->next)
for (struct mxq_job_list *jlist = glist->jobs; jlist ; jlist = jlist->next)
if (jlist->job.job_id == job_id) {
killstate_event(ppidcache, jlist, KILLEVENT_CANCEL);
return;
}
}

static void kill_cancelled_jobs(struct ppidcache *ppidcache, struct mxq_server *server) {
struct mx_mysql *mysql = server->mysql;
struct mxq_daemon *daemon = &server->daemon;
(void)ppidcache;

__attribute__((cleanup(mx_mysql_statement_close)))
struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql,
"SELECT job_id FROM mxq_job"
" WHERE job_status = " status_str(MXQ_JOB_STATUS_RUNNING)
" AND job_cancelled"
" AND host_hostname = ?"
" AND server_id = ?"
);
if (!stmt) {
mx_log_err("mx_mysql_stmt_prepare: %s\n", mx_mysql_error());
return;
}

mx_mysql_statement_param_bind(stmt, 0, string, &daemon->hostname);
mx_mysql_statement_param_bind(stmt, 1, string, &daemon->daemon_name);

unsigned long long num_rows;
int res = mx_mysql_statement_execute(stmt, &num_rows);
if (res < 0) {
mx_log_err("mx_mysql_statement_execute: %s\n", mx_mysql_error());
return;
}
if (num_rows == 0)
return;

unsigned long job_id;
mx_mysql_statement_result_bind(stmt, 0, uint64, &job_id);

for (unsigned long i = 0 ; i < num_rows ; i++) {
res = mx_mysql_statement_fetch(stmt);
if (res < 0) {
mx_log_err("mx_mysql_statement_fetch: %s\n", mx_mysql_error());
return;
}
mx_log_debug("kill running job id %lu", job_id);
kill_by_jobid(ppidcache, server, job_id);
}
}

static void rename_outfiles(struct mxq_server *server, struct mxq_group *group, struct mxq_job *job)
{
int res;
Expand Down Expand Up @@ -2541,6 +2595,7 @@ static void monitor_jobs(struct mxq_server *server)
killall_cancelled(ppidcache, server);
killall_over_time(ppidcache, server);
killall_over_memory(ppidcache, server);
kill_cancelled_jobs(ppidcache, server);
for (ulist = server->users; ulist; ulist = ulist->next) {
for (glist = ulist->groups; glist; glist = glist->next) {
for (jlist = glist->jobs; jlist; jlist = jlist->next)
Expand Down

0 comments on commit f7e2345

Please sign in to comment.