From 2ac8098c8a26f3ac2e998bd8d349518bd9718f24 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Fri, 20 Aug 2021 13:05:49 +0200 Subject: [PATCH] mxqd: Add `killstate` state machine When we kill a job because it is over time, over memory or has been cancelled, we want to send a sequence of different signals. Currently this is impemented for "over memory" and "over time" only and with two different mechanisms. "cancel job" currently sends SIGTERM only so that some jobs are not correctly terminated. Unify the mechanism in a state machine. The state is kept in two new fields `job_killstate` and `next_signal_time` in the servers job structures. We mimic the current kill sequences but add SIGKILL for canceled jobs after 30 seconds if they don't go away from SIGTERM. After we've sent an initial SIGKILL, we will continue to send more SIGKILLS to terminate any child we mioght have missed. The state machines uses ppidcache to send the signals to all descendants of the jobs host_pid (which is the pid of the reaper). With this commit, the state machine is added but not yet used. --- Makefile | 1 + mxqd.c | 143 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ mxqd.h | 15 ++++++ 3 files changed, 159 insertions(+) diff --git a/Makefile b/Makefile index ae84325c..4c471973 100644 --- a/Makefile +++ b/Makefile @@ -534,6 +534,7 @@ clean: CLEAN += ppidcache.o mxqd: mx_flock.o mxqd: mx_util.o mxqd: mx_proc.o +mxqd: ppidcache.o mxqd: mx_log.o mxqd: mxq_log.o mxqd: mx_getopt.o diff --git a/mxqd.c b/mxqd.c index 3941912f..8f95f150 100644 --- a/mxqd.c +++ b/mxqd.c @@ -45,6 +45,7 @@ #include "mxqd_control.h" #include "keywordset.h" #include "parser.tab.h" +#include "ppidcache.h" #ifndef MXQ_INITIAL_PATH # define MXQ_INITIAL_PATH "/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/usr/local/bin" @@ -1650,6 +1651,148 @@ void server_close(struct mxq_server *server) server_free(server); } +static int signal_descendants_cb(void *data, pid_t pid) { + int signal = *(int *)data; + if (signal != SIGKILL) + kill(pid, SIGCONT); + kill(pid, signal); + return 1; +} + +static void signal_descendants(struct ppidcache *ppidcache, pid_t pid, int signal) +{ + ppidcache_do_descendants(ppidcache, pid, signal_descendants_cb, &signal); +} + +static void signal_job(struct ppidcache *ppidcache, struct mxq_job_list *jlist, int signal) +{ + mx_log_info("sending signal=%d to job=%s(%d):%lu:%lu", + signal, + jlist->group->group.user_name, jlist->group->group.user_uid, + jlist->group->group.group_id, jlist->job.job_id); + + signal_descendants(ppidcache, jlist->job.host_pid, signal); +} + +/* +* State machine for "kill" events to jobs. +* +* Signals to be sent: +* +* job is over time: : SIGXCPU , after +5% group-time SIGTERM , after 10% group time + 10 minutes SIGKILL +* job is over memory : SIGTERM , after 10 seconds SIGKILL +* job is cancelled : SIGTERM , after 30 seconds SIGKILL +* +* Once KILL is sent, this is repeated every 30 seconds to terminate any child we might have missed. +* +* Events; +* +* CHECK : time passed, check timeouts +* OVERTIME : job is over time +* OVERMEMORY : job is over memory +* CANCEL : job is cancelled (user or non-gracefull server shutdown) +* +* States: +* +* RUNNING : (initial) +* WAIT_TERM : (after overtime) XCPU has been sent, waiting for timeout to send TERM and KILL +* WAIT_KILL : TERM has been sent, waiting for timeout to send (next) KILL +* */ + +static void killstate_event(struct ppidcache *ppidcache, struct mxq_job_list *jlist, enum job_killevent event) +{ + time_t uptime_seconds = mx_clock_boottime(); + + switch (jlist->killstate) { + case KILLSTATE_RUNNING: + switch (event) { + case KILLEVENT_CHECK: + break; + case KILLEVENT_OVERTIME: + mx_log_info("job=%s(%d):%lu:%lu exceeded time limit", + jlist->group->group.user_name, jlist->group->group.user_uid, + jlist->group->group.group_id, jlist->job.job_id); + signal_job(ppidcache, jlist, SIGXCPU); + jlist->killstate = KILLSTATE_WAIT_TERM; + jlist->next_signal_at_uptime_seconds = uptime_seconds + jlist->group->group.job_time * 3; // 0.05 * job_time*60 + break; + case KILLEVENT_OVERMEMORY: + mx_log_info("job=%s(%d):%lu:%lu exceeded memory limit", + jlist->group->group.user_name, jlist->group->group.user_uid, + jlist->group->group.group_id, jlist->job.job_id); + signal_job(ppidcache, jlist, SIGTERM); + jlist->killstate = KILLSTATE_WAIT_KILL; + jlist->next_signal_at_uptime_seconds = uptime_seconds + 10; + break; + case KILLEVENT_CANCEL: + mx_log_info("job=%s(%d):%lu:%lu cancelled", + jlist->group->group.user_name, jlist->group->group.user_uid, + jlist->group->group.group_id, jlist->job.job_id); + signal_job(ppidcache, jlist, SIGTERM); + jlist->killstate = KILLSTATE_WAIT_KILL; + jlist->next_signal_at_uptime_seconds = uptime_seconds + 30; + break; + } + break; + case KILLSTATE_WAIT_TERM: + switch (event) { + case KILLEVENT_CHECK: + if (uptime_seconds >= jlist->next_signal_at_uptime_seconds) { + signal_job(ppidcache, jlist, SIGTERM); + jlist->killstate = KILLSTATE_WAIT_KILL; + jlist->next_signal_at_uptime_seconds = uptime_seconds + jlist->group->group.job_time * 6 + 600; // 0.10 * job_time*60 + 10*60 + } + break; + case KILLEVENT_OVERTIME: + break; + case KILLEVENT_OVERMEMORY: + mx_log_info("job=%s(%d):%lu:%lu exceeded memory limit", + jlist->group->group.user_name, jlist->group->group.user_uid, + jlist->group->group.group_id, jlist->job.job_id); + signal_job(ppidcache, jlist, SIGTERM); + jlist->killstate = KILLSTATE_WAIT_KILL; + jlist->next_signal_at_uptime_seconds = uptime_seconds + 10; + break; + case KILLEVENT_CANCEL: + mx_log_info("job=%s(%d):%lu:%lu cancelled", + jlist->group->group.user_name, jlist->group->group.user_uid, + jlist->group->group.group_id, jlist->job.job_id); + signal_job(ppidcache, jlist, SIGTERM); + jlist->killstate = KILLSTATE_WAIT_KILL; + jlist->next_signal_at_uptime_seconds = uptime_seconds + 30; + break; + } + break; + case KILLSTATE_WAIT_KILL: + switch (event) { + case KILLEVENT_CHECK: + if (uptime_seconds >= jlist->next_signal_at_uptime_seconds) { + signal_job(ppidcache, jlist, SIGKILL); + jlist->next_signal_at_uptime_seconds = uptime_seconds + 30; + } + break; + case KILLEVENT_OVERTIME: + break; + case KILLEVENT_OVERMEMORY: + if (jlist->next_signal_at_uptime_seconds > uptime_seconds + 10) { + mx_log_info("job=%s(%d):%lu:%lu exceeded memory limit", + jlist->group->group.user_name, jlist->group->group.user_uid, + jlist->group->group.group_id, jlist->job.job_id); + jlist->next_signal_at_uptime_seconds = uptime_seconds + 10; + } + break; + case KILLEVENT_CANCEL: + if (jlist->next_signal_at_uptime_seconds > uptime_seconds + 30) { + mx_log_info("job=%s(%d):%lu:%lu cancelled", + jlist->group->group.user_name, jlist->group->group.user_uid, + jlist->group->group.group_id, jlist->job.job_id); + } + break; + } + break; + } +} + int killall(struct mxq_server *server, int sig, unsigned int pgrp) { struct mxq_user_list *ulist; diff --git a/mxqd.h b/mxqd.h index ad038e47..8563b752 100644 --- a/mxqd.h +++ b/mxqd.h @@ -10,6 +10,19 @@ #include +enum job_killevent { + KILLEVENT_CHECK, // time passed, check timeouts + KILLEVENT_OVERTIME, // job is over time + KILLEVENT_OVERMEMORY, // job is over memory + KILLEVENT_CANCEL, // job is cancelled (user or non-gracefull server shutdown) +}; + +enum job_killstate { + KILLSTATE_RUNNING = 0, // (initial) + KILLSTATE_WAIT_TERM, // (after overtime) XCPU has been sent, waiting for timeout to send TERM and KILL + KILLSTATE_WAIT_KILL, // TERM has been sent, waiting for timeout to send (next) KILL +}; + struct mxq_job_list { struct mxq_group_list *group; struct mxq_job_list *next; @@ -17,6 +30,8 @@ struct mxq_job_list { struct mxq_job job; unsigned long long int max_sumrss; + enum job_killstate killstate; + time_t next_signal_at_uptime_seconds; }; struct mxq_group_list {