Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
mxqd: Add killstate state machine
When we kill a job because it is over time, over memory or has been
cancelled, we want to send a sequence of different signals.  Currently
this is impemented for "over memory" and "over time" only and with
two different mechanisms. "cancel job" currently sends SIGTERM only so
that some jobs are not correctly terminated.

Unify the mechanism in a state machine. The state is kept in two new
fields `job_killstate` and `next_signal_time` in the servers
job structures.

We mimic the current kill sequences but add SIGKILL for canceled jobs
after 30 seconds if they don't go away from SIGTERM.

After we've sent an initial SIGKILL, we will continue to send more
SIGKILLS to terminate any child we mioght have missed.

The state machines uses ppidcache to send the signals to all descendants
of the jobs host_pid (which is the pid of the reaper).

With this commit, the state machine is added but not yet used.
  • Loading branch information
donald committed Aug 24, 2021
1 parent 74ebbc3 commit 2ac8098
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 0 deletions.
1 change: 1 addition & 0 deletions Makefile
Expand Up @@ -534,6 +534,7 @@ clean: CLEAN += ppidcache.o
mxqd: mx_flock.o
mxqd: mx_util.o
mxqd: mx_proc.o
mxqd: ppidcache.o
mxqd: mx_log.o
mxqd: mxq_log.o
mxqd: mx_getopt.o
Expand Down
143 changes: 143 additions & 0 deletions mxqd.c
Expand Up @@ -45,6 +45,7 @@
#include "mxqd_control.h"
#include "keywordset.h"
#include "parser.tab.h"
#include "ppidcache.h"

#ifndef MXQ_INITIAL_PATH
# define MXQ_INITIAL_PATH "/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/usr/local/bin"
Expand Down Expand Up @@ -1650,6 +1651,148 @@ void server_close(struct mxq_server *server)
server_free(server);
}

static int signal_descendants_cb(void *data, pid_t pid) {
int signal = *(int *)data;
if (signal != SIGKILL)
kill(pid, SIGCONT);
kill(pid, signal);
return 1;
}

static void signal_descendants(struct ppidcache *ppidcache, pid_t pid, int signal)
{
ppidcache_do_descendants(ppidcache, pid, signal_descendants_cb, &signal);
}

static void signal_job(struct ppidcache *ppidcache, struct mxq_job_list *jlist, int signal)
{
mx_log_info("sending signal=%d to job=%s(%d):%lu:%lu",
signal,
jlist->group->group.user_name, jlist->group->group.user_uid,
jlist->group->group.group_id, jlist->job.job_id);

signal_descendants(ppidcache, jlist->job.host_pid, signal);
}

/*
* State machine for "kill" events to jobs.
*
* Signals to be sent:
*
* job is over time: : SIGXCPU , after +5% group-time SIGTERM , after 10% group time + 10 minutes SIGKILL
* job is over memory : SIGTERM , after 10 seconds SIGKILL
* job is cancelled : SIGTERM , after 30 seconds SIGKILL
*
* Once KILL is sent, this is repeated every 30 seconds to terminate any child we might have missed.
*
* Events;
*
* CHECK : time passed, check timeouts
* OVERTIME : job is over time
* OVERMEMORY : job is over memory
* CANCEL : job is cancelled (user or non-gracefull server shutdown)
*
* States:
*
* RUNNING : (initial)
* WAIT_TERM : (after overtime) XCPU has been sent, waiting for timeout to send TERM and KILL
* WAIT_KILL : TERM has been sent, waiting for timeout to send (next) KILL
* */

static void killstate_event(struct ppidcache *ppidcache, struct mxq_job_list *jlist, enum job_killevent event)
{
time_t uptime_seconds = mx_clock_boottime();

switch (jlist->killstate) {
case KILLSTATE_RUNNING:
switch (event) {
case KILLEVENT_CHECK:
break;
case KILLEVENT_OVERTIME:
mx_log_info("job=%s(%d):%lu:%lu exceeded time limit",
jlist->group->group.user_name, jlist->group->group.user_uid,
jlist->group->group.group_id, jlist->job.job_id);
signal_job(ppidcache, jlist, SIGXCPU);
jlist->killstate = KILLSTATE_WAIT_TERM;
jlist->next_signal_at_uptime_seconds = uptime_seconds + jlist->group->group.job_time * 3; // 0.05 * job_time*60
break;
case KILLEVENT_OVERMEMORY:
mx_log_info("job=%s(%d):%lu:%lu exceeded memory limit",
jlist->group->group.user_name, jlist->group->group.user_uid,
jlist->group->group.group_id, jlist->job.job_id);
signal_job(ppidcache, jlist, SIGTERM);
jlist->killstate = KILLSTATE_WAIT_KILL;
jlist->next_signal_at_uptime_seconds = uptime_seconds + 10;
break;
case KILLEVENT_CANCEL:
mx_log_info("job=%s(%d):%lu:%lu cancelled",
jlist->group->group.user_name, jlist->group->group.user_uid,
jlist->group->group.group_id, jlist->job.job_id);
signal_job(ppidcache, jlist, SIGTERM);
jlist->killstate = KILLSTATE_WAIT_KILL;
jlist->next_signal_at_uptime_seconds = uptime_seconds + 30;
break;
}
break;
case KILLSTATE_WAIT_TERM:
switch (event) {
case KILLEVENT_CHECK:
if (uptime_seconds >= jlist->next_signal_at_uptime_seconds) {
signal_job(ppidcache, jlist, SIGTERM);
jlist->killstate = KILLSTATE_WAIT_KILL;
jlist->next_signal_at_uptime_seconds = uptime_seconds + jlist->group->group.job_time * 6 + 600; // 0.10 * job_time*60 + 10*60
}
break;
case KILLEVENT_OVERTIME:
break;
case KILLEVENT_OVERMEMORY:
mx_log_info("job=%s(%d):%lu:%lu exceeded memory limit",
jlist->group->group.user_name, jlist->group->group.user_uid,
jlist->group->group.group_id, jlist->job.job_id);
signal_job(ppidcache, jlist, SIGTERM);
jlist->killstate = KILLSTATE_WAIT_KILL;
jlist->next_signal_at_uptime_seconds = uptime_seconds + 10;
break;
case KILLEVENT_CANCEL:
mx_log_info("job=%s(%d):%lu:%lu cancelled",
jlist->group->group.user_name, jlist->group->group.user_uid,
jlist->group->group.group_id, jlist->job.job_id);
signal_job(ppidcache, jlist, SIGTERM);
jlist->killstate = KILLSTATE_WAIT_KILL;
jlist->next_signal_at_uptime_seconds = uptime_seconds + 30;
break;
}
break;
case KILLSTATE_WAIT_KILL:
switch (event) {
case KILLEVENT_CHECK:
if (uptime_seconds >= jlist->next_signal_at_uptime_seconds) {
signal_job(ppidcache, jlist, SIGKILL);
jlist->next_signal_at_uptime_seconds = uptime_seconds + 30;
}
break;
case KILLEVENT_OVERTIME:
break;
case KILLEVENT_OVERMEMORY:
if (jlist->next_signal_at_uptime_seconds > uptime_seconds + 10) {
mx_log_info("job=%s(%d):%lu:%lu exceeded memory limit",
jlist->group->group.user_name, jlist->group->group.user_uid,
jlist->group->group.group_id, jlist->job.job_id);
jlist->next_signal_at_uptime_seconds = uptime_seconds + 10;
}
break;
case KILLEVENT_CANCEL:
if (jlist->next_signal_at_uptime_seconds > uptime_seconds + 30) {
mx_log_info("job=%s(%d):%lu:%lu cancelled",
jlist->group->group.user_name, jlist->group->group.user_uid,
jlist->group->group.group_id, jlist->job.job_id);
}
break;
}
break;
}
}

int killall(struct mxq_server *server, int sig, unsigned int pgrp)
{
struct mxq_user_list *ulist;
Expand Down
15 changes: 15 additions & 0 deletions mxqd.h
Expand Up @@ -10,13 +10,28 @@

#include <sched.h>

enum job_killevent {
KILLEVENT_CHECK, // time passed, check timeouts
KILLEVENT_OVERTIME, // job is over time
KILLEVENT_OVERMEMORY, // job is over memory
KILLEVENT_CANCEL, // job is cancelled (user or non-gracefull server shutdown)
};

enum job_killstate {
KILLSTATE_RUNNING = 0, // (initial)
KILLSTATE_WAIT_TERM, // (after overtime) XCPU has been sent, waiting for timeout to send TERM and KILL
KILLSTATE_WAIT_KILL, // TERM has been sent, waiting for timeout to send (next) KILL
};

struct mxq_job_list {
struct mxq_group_list *group;
struct mxq_job_list *next;

struct mxq_job job;

unsigned long long int max_sumrss;
enum job_killstate killstate;
time_t next_signal_at_uptime_seconds;
};

struct mxq_group_list {
Expand Down

0 comments on commit 2ac8098

Please sign in to comment.