diff --git a/mxqd.c b/mxqd.c index 021bf786..4c65a2a9 100644 --- a/mxqd.c +++ b/mxqd.c @@ -1809,6 +1809,60 @@ static int killall_cancelled(struct ppidcache *ppidcache, struct mxq_server *ser return 0; } +static void kill_by_jobid(struct ppidcache *ppidcache, struct mxq_server *server, unsigned long job_id) { + for (struct mxq_user_list *ulist = server->users ; ulist ; ulist = ulist->next) + for (struct mxq_group_list *glist = ulist->groups ; glist ; glist = glist->next) + for (struct mxq_job_list *jlist = glist->jobs; jlist ; jlist = jlist->next) + if (jlist->job.job_id == job_id) { + killstate_event(ppidcache, jlist, KILLEVENT_CANCEL); + return; + } +} + +static void kill_cancelled_jobs(struct ppidcache *ppidcache, struct mxq_server *server) { + struct mx_mysql *mysql = server->mysql; + struct mxq_daemon *daemon = &server->daemon; + (void)ppidcache; + + __attribute__((cleanup(mx_mysql_statement_close))) + struct mx_mysql_stmt *stmt = mx_mysql_statement_prepare(mysql, + "SELECT job_id FROM mxq_job" + " WHERE job_status = " status_str(MXQ_JOB_STATUS_RUNNING) + " AND job_cancelled" + " AND host_hostname = ?" + " AND server_id = ?" + ); + if (!stmt) { + mx_log_err("mx_mysql_stmt_prepare: %s\n", mx_mysql_error()); + return; + } + + mx_mysql_statement_param_bind(stmt, 0, string, &daemon->hostname); + mx_mysql_statement_param_bind(stmt, 1, string, &daemon->daemon_name); + + unsigned long long num_rows; + int res = mx_mysql_statement_execute(stmt, &num_rows); + if (res < 0) { + mx_log_err("mx_mysql_statement_execute: %s\n", mx_mysql_error()); + return; + } + if (num_rows == 0) + return; + + unsigned long job_id; + mx_mysql_statement_result_bind(stmt, 0, uint64, &job_id); + + for (unsigned long i = 0 ; i < num_rows ; i++) { + res = mx_mysql_statement_fetch(stmt); + if (res < 0) { + mx_log_err("mx_mysql_statement_fetch: %s\n", mx_mysql_error()); + return; + } + mx_log_debug("kill running job id %lu", job_id); + kill_by_jobid(ppidcache, server, job_id); + } +} + static void rename_outfiles(struct mxq_server *server, struct mxq_group *group, struct mxq_job *job) { int res; @@ -2541,6 +2595,7 @@ static void monitor_jobs(struct mxq_server *server) killall_cancelled(ppidcache, server); killall_over_time(ppidcache, server); killall_over_memory(ppidcache, server); + kill_cancelled_jobs(ppidcache, server); for (ulist = server->users; ulist; ulist = ulist->next) { for (glist = ulist->groups; glist; glist = glist->next) { for (jlist = glist->jobs; jlist; jlist = jlist->next)