Skip to content

Commit

Permalink
mxqd: Use external reaper
Browse files Browse the repository at this point in the history
Call the external helper instead of running the reaper process in the
memory cloned from the mxq main process.

- init_child_process() was used to initialize the user process forked from
  the reaper. We now do the same things in the reaper process itself
  because we no longer fork the user process in this program, the
  external reaper image will do that. The settings we do now to the
  reaper process will be inherited by the user process.
- As before we need to change our effective user ident before we chdir
  into the cwd of the job and open the output files for the user. But we
  keep the real user ident, so that we can change back to root later.
- user_process() now exec()s the external helper with the required
  arguments instead of the user image directly. Before we do so, we need
  to change our UIDs back to root. The external helper needs privileges
  to write the spool file.
- The functionxtality to wait for the user process and write the spool
  file is removed, this is now done by the helper.
- In the absense of errors, the function reaper_process() will no longer
  return.

Note: We don't free new_argv in user_process, because we will exec() or
_exit() anyway.
  • Loading branch information
donald committed Apr 20, 2022
1 parent 521f3a9 commit d2d095a
Showing 1 changed file with 29 additions and 92 deletions.
121 changes: 29 additions & 92 deletions mxqd.c
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job)
return 0;
}

res = setreuid(group->user_uid, group->user_uid);
res = setreuid(-1, group->user_uid);
if (res == -1) {
mx_log_err("job=%s(%d):%lu:%lu setreuid(%d, %d) failed: %m",
group->user_name, group->user_uid, group->group_id, job->job_id,
Expand Down Expand Up @@ -1098,7 +1098,7 @@ static int mxq_redirect_input(char *stdin_fname)
return 1;
}

static int user_process(struct mxq_group_list *glist, struct mxq_job *job)
static int user_process(struct mxq_server *server, struct mxq_group_list *glist, struct mxq_job *job)
{
int res;
char **argv;
Expand Down Expand Up @@ -1146,7 +1146,30 @@ static int user_process(struct mxq_group_list *glist, struct mxq_job *job)
return -errno;
}

res = execvp(argv[0], argv);
int argc = 0;
while (argv[argc] != NULL)
argc++;

char **new_argv = mx_calloc_forever(argc+4+1, sizeof(char *));
new_argv[0] = LIBEXECDIR "/mxq/mxq_reaper";
new_argv[1] = mx_asprintf_forever("%d", group->user_uid);
new_argv[2] = mx_asprintf_forever("%s/%lu.stat", server->finished_jobsdir, job->job_id);
new_argv[3] = "--";
for (int i = 0; i < argc ; i++)
new_argv[i+4] = argv[i];
new_argv[argc+4] = NULL;

res = setuid(0);
if (res == -1) {
mx_log_err("job=%s(%d):%lu:%lu setuid(0) failed: %m",
group->user_name,
group->user_uid,
group->group_id,
job->job_id);
return -errno;
}

res = execvp(new_argv[0], new_argv);
mx_log_err("job=%s(%d):%lu:%lu execvp(\"%s\", ...): %m",
group->user_name,
group->user_uid,
Expand All @@ -1169,16 +1192,6 @@ static int is_reaper(pid_t pid) {
}

static int reaper_process(struct mxq_server *server,struct mxq_group_list *glist, struct mxq_job *job) {
pid_t pid;
struct rusage rusage;
int status = 0;
pid_t waited_pid;
int waited_status;
struct timeval now;
struct timeval realtime;
_mx_cleanup_free_ char *finished_job_filename=NULL;
_mx_cleanup_free_ char *finished_job_tmpfilename=NULL;
FILE *out;
int res;

struct mxq_group *group;
Expand All @@ -1203,85 +1216,9 @@ static int reaper_process(struct mxq_server *server,struct mxq_group_list *glist
return res;
}

pid = fork();
if (pid < 0) {
mx_log_err("fork: %m");
return pid;
} else if (pid == 0) {
mx_log_debug("starting user process.");
res = user_process(glist, job);
_exit(EX__MAX+1);
}
gettimeofday(&job->stats_starttime, NULL);

while (1) {
waited_pid = wait(&waited_status);
if (waited_pid < 0) {
if (errno==ECHILD) {
break;
} else {
mx_log_warning("reaper: wait: %m");
sleep(1);
}
}
if (waited_pid == pid) {
status = waited_status;
}
}
gettimeofday(&now, NULL);
timersub(&now, &job->stats_starttime, &realtime);

if (realtime.tv_sec<30) {
int wait=30-realtime.tv_sec;
mx_log_warning("user process finished to fast (%ld seconds) : delaying termination for %d seconds",realtime.tv_sec,wait);
sleep(wait);
}

res = getrusage(RUSAGE_CHILDREN, &rusage);
if (res < 0) {
mx_log_err("reaper: getrusage: %m");
return(res);
}

finished_job_filename = mx_asprintf_forever("%s/%lu.stat", server->finished_jobsdir, job->job_id);
finished_job_tmpfilename = mx_asprintf_forever("%s.tmp", finished_job_filename);

out=fopen(finished_job_tmpfilename,"w");
if (!out) {
mx_log_fatal("%s: %m",finished_job_tmpfilename);
return (-errno);
}

fprintf(out,"1 %d %d %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld\n",
getpid(),
status,
realtime.tv_sec,realtime.tv_usec,
rusage.ru_utime.tv_sec,rusage.ru_utime.tv_usec,
rusage.ru_stime.tv_sec,rusage.ru_stime.tv_usec,
rusage.ru_maxrss,
rusage.ru_ixrss,
rusage.ru_idrss,
rusage.ru_isrss,
rusage.ru_minflt,
rusage.ru_majflt,
rusage.ru_nswap,
rusage.ru_inblock,
rusage.ru_oublock,
rusage.ru_msgsnd,
rusage.ru_msgrcv,
rusage.ru_nsignals,
rusage.ru_nvcsw,
rusage.ru_nivcsw
);
fflush(out);
fsync(fileno(out));
fclose(out);
res=rename(finished_job_tmpfilename,finished_job_filename);
if (res<0) {
mx_log_fatal("rename %s: %m",finished_job_tmpfilename);
return(res);
}
return(0);
res = user_process(server, glist, job);
mx_log_err("user process:: %m");
return res;
}

static unsigned long start_job(struct mxq_group_list *glist)
Expand Down

0 comments on commit d2d095a

Please sign in to comment.