Skip to content

Commit

Permalink
mxqd: Use synchronous signals
Browse files Browse the repository at this point in the history
We intend to expand the functionality which can be triggered
by signals, e.g. switch log level to debug or trigger a state
dump.

By processing signals synchronously, we are free to call
non-reentrant functions and know, that our own data is not
in a transient state.

Block asynchronous signals and receive and process signals explicitly.

Re-enable the asynchronous signals when the user process in initialized.
  • Loading branch information
donald committed Jul 5, 2017
1 parent d99d562 commit ff2f49f
Showing 1 changed file with 46 additions and 36 deletions.
82 changes: 46 additions & 36 deletions mxqd.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ volatile sig_atomic_t global_sigterm_cnt=0;
volatile sig_atomic_t global_sigquit_cnt=0;
volatile sig_atomic_t global_sigrestart_cnt=0;

static sigset_t all_signals;

int mxq_redirect_output(char *stdout_fname, char *stderr_fname);
void server_free(struct mxq_server *server);
Expand Down Expand Up @@ -734,6 +735,8 @@ static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job)

reset_signals();

sigprocmask(SIG_UNBLOCK,&all_signals,NULL);

passwd = getpwuid(group->user_uid);
if (!passwd) {
mx_log_err("job=%s(%d):%lu:%lu getpwuid(): %m",
Expand Down Expand Up @@ -2303,29 +2306,31 @@ int recover_from_previous_crash(struct mxq_server *server)
return res;
}

/**********************************************************************/
static void no_handler(int sig) {}

static void sig_handler(int sig)
static void process_signal(struct mxq_server *server,int sig,int extra)
{
if (sig == SIGINT) {
global_sigint_cnt++;
return;
}

if (sig == SIGTERM) {
global_sigterm_cnt++;
return;
}

if (sig == SIGQUIT) {
global_sigquit_cnt++;
return;
}

if (sig == SIGUSR1) {
global_sigrestart_cnt++;
return;
switch (sig) {
case SIGINT:
mx_log_info("received sigint");
global_sigint_cnt++;
break;
case SIGTERM:
mx_log_info("received sigterm");
global_sigterm_cnt++;
break;
case SIGQUIT:
mx_log_info("received sigquit");
global_sigquit_cnt++;
break;
case SIGUSR1:
mx_log_info("received sigusr2");
global_sigrestart_cnt++;
break;
case SIGCHLD:
mx_log_info("received sigchld");
break;
default:
mx_log_warning("received signal %d (unexpected!)",sig);
break;
}
}

Expand All @@ -2342,11 +2347,14 @@ int main(int argc, char *argv[])

int res;
int fail = 0;
struct timespec poll_interval={0,0};
siginfo_t siginfo;

int saved_argc;
_mx_cleanup_free_ char *saved_argv_str = NULL;
_mx_cleanup_free_ char *saved_cwd = NULL;

sigfillset(&all_signals);

/*** server init ***/

Expand Down Expand Up @@ -2391,14 +2399,7 @@ int main(int argc, char *argv[])

/*** main loop ***/

signal(SIGINT, sig_handler);
signal(SIGTERM, sig_handler);
signal(SIGQUIT, sig_handler);
signal(SIGUSR1, sig_handler);
signal(SIGTSTP, SIG_IGN);
signal(SIGTTIN, SIG_IGN);
signal(SIGTTOU, SIG_IGN);
signal(SIGCHLD, no_handler);
sigprocmask(SIG_BLOCK,&all_signals,NULL);

res = recover_from_previous_crash(server);
if (res < 0) {
Expand All @@ -2412,6 +2413,11 @@ int main(int argc, char *argv[])
server_dump(server);

while (!global_sigint_cnt && !global_sigterm_cnt && !global_sigquit_cnt && !global_sigrestart_cnt && !fail) {
mx_log_debug("main loop - wait for signals max %ld sec",poll_interval.tv_sec);
res=sigtimedwait(&all_signals,&siginfo,&poll_interval);
if (res>0)
process_signal(server,res,siginfo.si_int);

slots_returned = catchall(server);
slots_returned += fspool_scan(server);

Expand All @@ -2436,7 +2442,7 @@ int main(int argc, char *argv[])
assert(!group_cnt);
mxq_daemon_set_status(server->mysql, daemon, MXQ_DAEMON_STATUS_IDLE);
mx_log_info("Nothing to do. Sleeping for a short while. (1 second)");
sleep(1);
poll_interval.tv_sec=1;
continue;
}

Expand All @@ -2449,16 +2455,16 @@ int main(int argc, char *argv[])
mxq_daemon_set_status(server->mysql, daemon, MXQ_DAEMON_STATUS_FULL);
}
mx_log_info("All slots running. Sleeping for a short while (7 seconds).");
sleep(7);
poll_interval.tv_sec=7;
continue;
}

slots_started = start_user_with_least_running_global_slot_count(server);
if (slots_started == -1) {
mxq_daemon_set_status(server->mysql, daemon, MXQ_DAEMON_STATUS_WAITING);
mx_log_info("no slots_started => we have users waiting for free slots. Sleeping (3 seconds).");
sleep(3);
slots_started = 0;
poll_interval.tv_sec=3;
continue;
} else if (slots_started) {
mx_log_info("slots_started=%lu :: Main Loop started %lu slots.", slots_started, slots_started);
Expand All @@ -2468,11 +2474,11 @@ int main(int argc, char *argv[])
if (!server->jobs_running) {
mxq_daemon_set_status(server->mysql, daemon, MXQ_DAEMON_STATUS_IDLE);
mx_log_info("Tried Hard and nobody is doing anything. Sleeping for a long while (15 seconds).");
sleep(15);
poll_interval.tv_sec=15;
} else {
mxq_daemon_set_status(server->mysql, daemon, MXQ_DAEMON_STATUS_RUNNING);
mx_log_info("Tried Hard. But have done nothing. Sleeping for a very short while (3 seconds).");
sleep(3);
poll_interval.tv_sec=3;
}
continue;
}
Expand All @@ -2488,6 +2494,7 @@ int main(int argc, char *argv[])
/* while not quitting and not restarting -> wait for and collect all running jobs */

mxq_daemon_set_status(server->mysql, daemon, MXQ_DAEMON_STATUS_TERMINATING);
poll_interval.tv_sec=1;
while (server->jobs_running && !global_sigquit_cnt && !global_sigrestart_cnt && !fail) {
slots_returned = catchall(server);
slots_returned += fspool_scan(server);
Expand Down Expand Up @@ -2515,7 +2522,10 @@ int main(int argc, char *argv[])
server->jobs_running,
global_sigint_cnt,
global_sigterm_cnt);
sleep(1);
mx_log_debug("termination loop - wait for signals max %ld sec",poll_interval.tv_sec);
res=sigtimedwait(&all_signals,&siginfo,&poll_interval);
if (res>0)
process_signal(server,res,siginfo.si_int);
}

mxq_daemon_shutdown(server->mysql, daemon);
Expand Down

0 comments on commit ff2f49f

Please sign in to comment.