Skip to content

Commit

Permalink
mxqd: Switch to killstate state machine
Browse files Browse the repository at this point in the history
Let the killall_XXX functions use the killstate state machine.

`killall` needs to set up its own ppidcache. This is not relevant
performace-wise, as it is used only once on ungraceful server shutdown.

The functions `killall_over_time`, `killall_over_memory` and
`killall_cancelled` on the other hand use ppidcache object from
monitor_jobs.

Remove all logic to produce sequences of signals from these functions,
because this is done by the state machine. The change will also make
`killall_cancelled` send a SIGKILL after a SIGTERM if needed.

Remove logging from the functions, which is also done by the state
machine.

Produce KILLEVENT_CHECK events for all jobs in monitor_jobs() to keep
the state machine going as time passes.
  • Loading branch information
donald committed Aug 24, 2021
1 parent 5df91cd commit a332670
Showing 1 changed file with 18 additions and 66 deletions.
84 changes: 18 additions & 66 deletions mxqd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1800,27 +1800,20 @@ int killall(struct mxq_server *server)
struct mxq_job_list *jlist;

struct mxq_group *group;
struct mxq_job *job;

pid_t pid;
struct ppidcache *ppidcache = ppidcache_new();
ppidcache_scan(ppidcache);

assert(server);

for (ulist = server->users; ulist; ulist = ulist->next) {
for (glist = ulist->groups; glist; glist = glist->next) {
group = &glist->group;

for (jlist = glist->jobs; jlist; jlist = jlist->next) {
job = &jlist->job;
pid = job->host_pid;
mx_log_info("Sending signal=%d to job=%s(%d):%lu:%lu pgrp=%d",
SIGTERM,
group->user_name, group->user_uid, group->group_id, job->job_id,
pid);
kill(-pid, SIGTERM);
}
for (jlist = glist->jobs; jlist; jlist = jlist->next)
killstate_event(ppidcache, jlist, KILLEVENT_CANCEL);
}
}
ppidcache_free(ppidcache);
return 0;
}

Expand All @@ -1836,8 +1829,6 @@ int killall_over_time(struct ppidcache *ppidcache, struct mxq_server *server)
struct timeval now;
struct timeval delta;

pid_t pid;

assert(server);

if (!server->jobs_running)
Expand All @@ -1862,27 +1853,7 @@ int killall_over_time(struct ppidcache *ppidcache, struct mxq_server *server)
if (delta.tv_sec <= group->job_time*60)
continue;

pid = job->host_pid;

mx_log_info("killall_over_time(): Sending signal=XCPU to job=%s(%d):%lu:%lu pgrp=%d",
group->user_name, group->user_uid, group->group_id, job->job_id, pid);
kill(-pid, SIGCONT);
kill(-pid, SIGXCPU);

if (delta.tv_sec <= group->job_time*63)
continue;

mx_log_info("killall_over_time(): Sending signal=TERM to job=%s(%d):%lu:%lu pgrp=%d",
group->user_name, group->user_uid, group->group_id, job->job_id, pid);
kill(-pid, SIGCONT);
kill(-pid, SIGTERM);

if (delta.tv_sec <= group->job_time*66+60*10)
continue;

mx_log_info("killall_over_time(): Sending signal=KILL to job=%s(%d):%lu:%lu pgrp=%d",
group->user_name, group->user_uid, group->group_id, job->job_id, pid);
kill(-pid, SIGKILL);
killstate_event(ppidcache, jlist, KILLEVENT_OVERTIME);
}
}
}
Expand Down Expand Up @@ -1930,16 +1901,9 @@ int killall_over_memory(struct ppidcache *ppidcache, struct mxq_server *server)

for (jlist = glist->jobs; jlist; jlist = jlist->next) {
unsigned long long int memory;
int signal;

job = &jlist->job;

/* sigterm has already been send last round ? */
if (jlist->max_sumrss/1024 > group->job_memory)
signal = SIGKILL;
else
signal = SIGTERM;

pinfo = mx_proc_tree_proc_info(ptree, job->host_pid);
if (!pinfo) {
mx_log_warning("killall_over_memory(): Can't find process with pid %u in process tree",
Expand All @@ -1955,18 +1919,7 @@ int killall_over_memory(struct ppidcache *ppidcache, struct mxq_server *server)
if (jlist->max_sumrss/1024 <= group->job_memory)
continue;

mx_log_info("killall_over_memory(): used(%lluMiB) > requested(%luMiB): Sending signal=%d to job=%s(%d):%lu:%lu pgrp=%d",
jlist->max_sumrss/1024,
group->job_memory,
signal,
group->user_name,
group->user_uid,
group->group_id,
job->job_id,
job->host_pid);

kill(-job->host_pid, SIGCONT);
kill(-job->host_pid, signal);
killstate_event(ppidcache, jlist, KILLEVENT_OVERMEMORY);
}
}
}
Expand All @@ -1983,8 +1936,6 @@ int killall_cancelled(struct ppidcache *ppidcache, struct mxq_server *server)
struct mxq_group *group;
struct mxq_job *job;

pid_t pid;

assert(server);

for (ulist = server->users; ulist; ulist = ulist->next) {
Expand All @@ -2000,16 +1951,7 @@ int killall_cancelled(struct ppidcache *ppidcache, struct mxq_server *server)

for (jlist = glist->jobs; jlist; jlist = jlist->next) {
job = &jlist->job;

pid = job->host_pid;
mx_log_info(" Sending signal=TERM to job=%s(%d):%lu:%lu pgrp=%d",
group->user_name,
group->user_uid,
group->group_id,
job->job_id,
pid);
kill(-pid, SIGCONT);
kill(-pid, SIGTERM);
killstate_event(ppidcache, jlist, KILLEVENT_CANCEL);
}
}
}
Expand Down Expand Up @@ -2755,11 +2697,21 @@ static void update_status(struct mxq_server *server)

static void monitor_jobs(struct mxq_server *server)
{
struct mxq_user_list *ulist;
struct mxq_group_list *glist;
struct mxq_job_list *jlist;
struct ppidcache *ppidcache = ppidcache_new();

ppidcache_scan(ppidcache);
killall_cancelled(ppidcache, server);
killall_over_time(ppidcache, server);
killall_over_memory(ppidcache, server);
for (ulist = server->users; ulist; ulist = ulist->next) {
for (glist = ulist->groups; glist; glist = glist->next) {
for (jlist = glist->jobs; jlist; jlist = jlist->next)
killstate_event(ppidcache, jlist, KILLEVENT_CHECK);
}
}
ppidcache_free(ppidcache);
}

Expand Down

0 comments on commit a332670

Please sign in to comment.