From 41e4282272d3f1645a6e66099e6013afdcaa702a Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Sun, 18 Oct 2015 20:00:34 +0200 Subject: [PATCH 01/24] mxqd: refactor (add user_process) --- mxqd.c | 81 +++++++++++++++++++++++++++++++--------------------------- 1 file changed, 43 insertions(+), 38 deletions(-) diff --git a/mxqd.c b/mxqd.c index 15fa51d..7eb5f8d 100644 --- a/mxqd.c +++ b/mxqd.c @@ -1142,6 +1142,47 @@ int mxq_redirect_input(char *stdin_fname) return 1; } +int user_process(struct mxq_group_list *group,struct mxq_job *mxqjob) +{ + int res; + char **argv; + + res = init_child_process(group, mxqjob); + if (!res) + return(-1); + + mxq_job_set_tmpfilenames(&group->group, mxqjob); + + res = mxq_redirect_input("/dev/null"); + if (res < 0) { + mx_log_err(" job=%s(%d):%lu:%lu mxq_redirect_input() failed (%d): %m", + group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob->job_id, + res); + return(res); + } + + res = mxq_redirect_output(mxqjob->tmp_stdout, mxqjob->tmp_stderr); + if (res < 0) { + mx_log_err(" job=%s(%d):%lu:%lu mxq_redirect_output() failed (%d): %m", + group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob->job_id, + res); + return(res); + } + + argv = mx_strvec_from_str(mxqjob->job_argv_str); + if (!argv) { + mx_log_err("job=%s(%d):%lu:%lu Can't recaculate commandline. str_to_strvev(%s) failed: %m", + group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob->job_id, + mxqjob->job_argv_str); + return(-errno); + } + + res=execvp(argv[0], argv); + mx_log_err("job=%s(%d):%lu:%lu execvp(\"%s\", ...): %m", + group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob->job_id, + argv[0]); + return(res); +} unsigned long start_job(struct mxq_group_list *group) { @@ -1150,7 +1191,6 @@ unsigned long start_job(struct mxq_group_list *group) struct mxq_job_list *job; pid_t pid; int res; - char **argv; assert(group); assert(group->user); @@ -1182,43 +1222,8 @@ unsigned long start_job(struct mxq_group_list *group) group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id, mxqjob.host_pid, getpgrp()); - res = init_child_process(group, &mxqjob); - if (!res) - _exit(EX__MAX + 1); - - mxq_job_set_tmpfilenames(&group->group, &mxqjob); - - - res = mxq_redirect_input("/dev/null"); - if (res < 0) { - mx_log_err(" job=%s(%d):%lu:%lu mxq_redirect_input() failed (%d): %m", - group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id, - res); - _exit(EX__MAX + 1); - } - - res = mxq_redirect_output(mxqjob.tmp_stdout, mxqjob.tmp_stderr); - if (res < 0) { - mx_log_err(" job=%s(%d):%lu:%lu mxq_redirect_output() failed (%d): %m", - group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id, - res); - _exit(EX__MAX + 1); - } - - - argv = mx_strvec_from_str(mxqjob.job_argv_str); - if (!argv) { - mx_log_err("job=%s(%d):%lu:%lu Can't recaculate commandline. str_to_strvev(%s) failed: %m", - group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id, - mxqjob.job_argv_str); - _exit(EX__MAX + 1); - } - - execvp(argv[0], argv); - mx_log_err("job=%s(%d):%lu:%lu execvp(\"%s\", ...): %m", - group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id, - argv[0]); - _exit(EX__MAX + 1); + res=user_process(group,&mxqjob); + exit(res<0 ? 1 : 0); } gettimeofday(&mxqjob.stats_starttime, NULL); From 155adc614523ed2cb4a83bf0bd6d70c47b93c905 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Fri, 23 Oct 2015 10:26:57 +0200 Subject: [PATCH 02/24] mxqd: refactor (add job_has_finished) --- mxqd.c | 72 ++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/mxqd.c b/mxqd.c index 7eb5f8d..9dc59fd 100644 --- a/mxqd.c +++ b/mxqd.c @@ -1720,6 +1720,47 @@ int killallcancelled(struct mxq_server *server, int sig, unsigned int pgrp) return 0; } +static int job_has_finished (struct mxq_server *server,struct mxq_group *g,struct mxq_job_list *job) +{ + int cnt=0; + int res; + struct mxq_job *j=&job->job; + + mxq_set_job_status_exited(server->mysql, j); + + if (j->job_status == MXQ_JOB_STATUS_FINISHED) { + g->group_jobs_finished++; + } else if(j->job_status == MXQ_JOB_STATUS_FAILED) { + g->group_jobs_failed++; + } else if(j->job_status == MXQ_JOB_STATUS_KILLED) { + g->group_jobs_failed++; + } + + mxq_job_set_tmpfilenames(g, j); + + if (!mx_streq(j->job_stdout, "/dev/null")) { + res = rename(j->tmp_stdout, j->job_stdout); + if (res == -1) { + mx_log_err(" job=%s(%d):%lu:%lu host_pid=%d :: rename(stdout) failed: %m", + g->user_name, g->user_uid, g->group_id, j->job_id, j->host_pid); + } + } + + if (!mx_streq(j->job_stderr, "/dev/null") && !mx_streq(j->job_stderr, j->job_stdout)) { + res = rename(j->tmp_stderr, j->job_stderr); + if (res == -1) { + mx_log_err(" job=%s(%d):%lu:%lu host_pid=%d :: rename(stderr) failed: %m", + g->user_name, g->user_uid, g->group_id, j->job_id, j->host_pid); + } + } + + cnt += job->group->slots_per_job; + cpuset_clear_running(&server->cpu_set_running,&j->host_cpu_set); + mxq_job_free_content(j); + free(job); + return cnt; +} + int catchall(struct mxq_server *server) { struct rusage rusage; @@ -1798,38 +1839,9 @@ int catchall(struct mxq_server *server) { mx_log_info(" job=%s(%d):%lu:%lu host_pid=%d stats_status=%d :: child process returned.", g->user_name, g->user_uid, g->group_id, j->job_id, pid, status); - mxq_set_job_status_exited(server->mysql, j); - - if (j->job_status == MXQ_JOB_STATUS_FINISHED) { - g->group_jobs_finished++; - } else if(j->job_status == MXQ_JOB_STATUS_FAILED) { - g->group_jobs_failed++; - } else if(j->job_status == MXQ_JOB_STATUS_KILLED) { - g->group_jobs_failed++; - } - mxq_job_set_tmpfilenames(g, j); + cnt+=job_has_finished(server,g,job); - if (!mx_streq(j->job_stdout, "/dev/null")) { - res = rename(j->tmp_stdout, j->job_stdout); - if (res == -1) { - mx_log_err(" job=%s(%d):%lu:%lu host_pid=%d :: rename(stdout) failed: %m", - g->user_name, g->user_uid, g->group_id, j->job_id, pid); - } - } - - if (!mx_streq(j->job_stderr, "/dev/null") && !mx_streq(j->job_stderr, j->job_stdout)) { - res = rename(j->tmp_stderr, j->job_stderr); - if (res == -1) { - mx_log_err(" job=%s(%d):%lu:%lu host_pid=%d :: rename(stderr) failed: %m", - g->user_name, g->user_uid, g->group_id, j->job_id, pid); - } - } - - cnt += job->group->slots_per_job; - cpuset_clear_running(&server->cpu_set_running,&j->host_cpu_set); - mxq_job_free_content(j); - free(job); } return cnt; From 4df9cfdd2a69526eab308a1384f8a43601f1b34d Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Thu, 22 Oct 2015 18:03:54 +0200 Subject: [PATCH 03/24] mx_util: add mx_mkdir_p --- mx_util.c | 31 +++++++++++++++++++++++++++++-- mx_util.h | 3 +++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/mx_util.c b/mx_util.c index f2a74db..a78dc6b 100644 --- a/mx_util.c +++ b/mx_util.c @@ -11,8 +11,8 @@ #include -//#include -//#include +#include +#include #include #include "mx_log.h" @@ -1272,3 +1272,30 @@ char *mx_cpuset_to_str(cpu_set_t* cpuset_ptr) mx_strvec_free(strvec); return out; } + +int mx_mkdir_p(char *path, mode_t mode) +{ + struct stat st; + int err; + char *d; + _mx_cleanup_free_ char *copy = NULL; + + if (stat(path, &st) == 0) + return 0; + + copy=mx_strdup_forever(path); + d=copy; + + while (*++d == '/'); + + while ((d = strchr(d, '/'))) { + *d = '\0'; + err = stat(copy, &st) && mkdir(copy, mode); + *d++ = '/'; + if (err) + return -errno; + while (*d == '/') + ++d; + } + return (stat(copy, &st) && mkdir(copy, mode)) ? -errno : 0; +} diff --git a/mx_util.h b/mx_util.h index 92614fe..c67ded7 100644 --- a/mx_util.h +++ b/mx_util.h @@ -162,4 +162,7 @@ char* mx_strvec_join(char *sep,char **strvec); char* mx_cpuset_to_str(cpu_set_t* cpuset_ptr); int mx_str_to_cpuset(cpu_set_t* cpuset_ptr,char *str); +int mx_mkdir_p(char *path, mode_t mode); + + #endif From bc0c4d7b73eee511342b6a12f2de78db26066d89 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Mon, 26 Oct 2015 15:55:51 +0100 Subject: [PATCH 04/24] make: add FINISHED_JOBSDIR --- Makefile | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Makefile b/Makefile index 21c95c5..19998d4 100644 --- a/Makefile +++ b/Makefile @@ -73,12 +73,15 @@ MXQ_MYSQL_DEFAULT_GROUP_DEVELOPMENT = mxqdevel MXQ_INITIAL_PATH = /sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/usr/local/bin MXQ_INITIAL_TMPDIR = /tmp +MXQ_FINISHED_JOBSDIR = ${LOCALSTATEDIR}/spool/mxqd + CFLAGS_MXQ_MYSQL_DEFAULT_FILE = -DMXQ_MYSQL_DEFAULT_FILE=\"$(MXQ_MYSQL_DEFAULT_FILE)\" CFLAGS_MXQ_MYSQL_DEFAULT_GROUP = -DMXQ_MYSQL_DEFAULT_GROUP_CLIENT=\"$(MXQ_MYSQL_DEFAULT_GROUP_CLIENT)\" CFLAGS_MXQ_MYSQL_DEFAULT_GROUP += -DMXQ_MYSQL_DEFAULT_GROUP_SERVER=\"$(MXQ_MYSQL_DEFAULT_GROUP_SERVER)\" CFLAGS_MXQ_MYSQL_DEFAULT_GROUP += -DMXQ_MYSQL_DEFAULT_GROUP_DEVELOPMENT=\"$(MXQ_MYSQL_DEFAULT_GROUP_DEVELOPMENT)\" CFLAGS_MXQ_INITIAL_PATH = -DMXQ_INITIAL_PATH=\"$(MXQ_INITIAL_PATH)\" CFLAGS_MXQ_INITIAL_TMPDIR = -DMXQ_INITIAL_TMPDIR=\"$(MXQ_INITIAL_TMPDIR)\" +CFLAGS_MXQ_FINISHED_JOBSDIR = -DMXQ_FINISHED_JOBSDIR=\"${MXQ_FINISHED_JOBSDIR}\" MYSQL_CONFIG = mysql_config @@ -417,6 +420,7 @@ mxqd.o: $(mx_mysql.h) mxqd.o: CFLAGS += $(CFLAGS_MYSQL) mxqd.o: CFLAGS += $(CFLAGS_MXQ_INITIAL_PATH) mxqd.o: CFLAGS += $(CFLAGS_MXQ_INITIAL_TMPDIR) +mxqd.o: CFLAGS += $(CFLAGS_MXQ_FINISHED_JOBSDIR) mxqd.o: CFLAGS += -Wno-unused-but-set-variable clean: CLEAN += mxqd.o From d14cf237713214641dee0d0b180d19a817954528 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Thu, 22 Oct 2015 18:18:04 +0200 Subject: [PATCH 05/24] mxqd: create MXQ_FINISHED_JOBSDIR on startup --- mxqd.c | 8 ++++++++ mxqd.h | 1 + 2 files changed, 9 insertions(+) diff --git a/mxqd.c b/mxqd.c index 9dc59fd..7d011e2 100644 --- a/mxqd.c +++ b/mxqd.c @@ -479,6 +479,13 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) exit(2); } + mx_asprintf_forever(&server->finished_jobsdir,"%s/%s",MXQ_FINISHED_JOBSDIR,server->server_id); + res=mx_mkdir_p(server->finished_jobsdir,0700); + if (res<0) { + mx_log_err("MAIN: mkdir %s failed: %m. Exiting.",MXQ_FINISHED_JOBSDIR); + exit(EX_IOERR); + } + if (arg_daemonize) { res = daemon(0, 1); if (res == -1) { @@ -1518,6 +1525,7 @@ void server_close(struct mxq_server *server) mx_free_null(server->boot_id); mx_free_null(server->host_id); + mx_free_null(server->finished_jobsdir); } int killall(struct mxq_server *server, int sig, unsigned int pgrp) diff --git a/mxqd.h b/mxqd.h index 4bc1cd0..1259ddd 100644 --- a/mxqd.h +++ b/mxqd.h @@ -81,6 +81,7 @@ struct mxq_server { char *hostname; char *server_id; char *pidfilename; + char *finished_jobsdir; struct mx_flock *flock; char *initial_path; From 7a4ec2193506f030ba93fedcd3d4cc8591ce0626 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Fri, 23 Oct 2015 15:11:52 +0200 Subject: [PATCH 06/24] mxqd: add help functions for fspool (finished job spool directory) --- mxqd.c | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 135 insertions(+), 1 deletion(-) diff --git a/mxqd.c b/mxqd.c index 7d011e2..95d3512 100644 --- a/mxqd.c +++ b/mxqd.c @@ -10,8 +10,9 @@ #include #include #include - +#include #include +#include #include @@ -1769,6 +1770,139 @@ static int job_has_finished (struct mxq_server *server,struct mxq_group *g,struc return cnt; } +static char *fspool_get_filename (struct mxq_server *server,long unsigned int job_id) +{ + char *fspool_filename; + mx_asprintf_forever(&fspool_filename,"%s/%lu.stat",server->finished_jobsdir,job_id); + return fspool_filename; +} + +void fspool_unlink(struct mxq_server *server,int job_id) { + char *fspool_filename=fspool_get_filename(server,job_id); + unlink(fspool_filename); + free(fspool_filename); +} + +static int fspool_process_file(struct mxq_server *server,char *filename,int job_id) { + FILE *in; + int res; + + pid_t pid; + int status; + struct rusage rusage; + struct timeval realtime; + + struct mxq_job_list *job; + struct mxq_job *j; + struct mxq_group *g; + + in=fopen(filename,"r"); + if (!in) { + return -errno; + } + res=fscanf(in,"1 %d %d %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld", + &pid, + &status, + &realtime.tv_sec,&realtime.tv_usec, + &rusage.ru_utime.tv_sec,&rusage.ru_utime.tv_usec, + &rusage.ru_stime.tv_sec,&rusage.ru_stime.tv_usec, + &rusage.ru_maxrss, + &rusage.ru_ixrss, + &rusage.ru_idrss, + &rusage.ru_isrss, + &rusage.ru_minflt, + &rusage.ru_majflt, + &rusage.ru_nswap, + &rusage.ru_inblock, + &rusage.ru_oublock, + &rusage.ru_msgsnd, + &rusage.ru_msgrcv, + &rusage.ru_nsignals, + &rusage.ru_nvcsw, + &rusage.ru_nivcsw); + fclose(in); + if (res!=22) { + mx_log_err("%s : parse error (res=%d)",filename,res); + if (!errno) + errno=EINVAL; + return -errno; + } + + mx_log_info("job finished (via fspool) : job %d pid %d status %d",job_id,pid,status); + + job = server_remove_job_by_pid(server, pid); + if (!job) { + mx_log_warning("fspool_process_file: %s : job unknown on server",filename); + return(-1); + } + j = &job->job; + assert(job->group); + assert(j->job_id=job_id); + g = &job->group->group; + + j->stats_realtime=realtime; + j->stats_status=status; + j->stats_rusage=rusage; + + job_has_finished(server,g,job); + fspool_unlink(server,job_id); + return(0); +} + +static int fspool_is_valid_name_parse(const char *name,int *job_id) { + const char *c=name; + if (!*c) + return 0; + if (!isdigit(*c++)) + return 0; + while(isdigit(*c)) { + c++; + } + if (strcmp(c,".stat")) { + return 0; + } + if (job_id) { + *job_id=atol(name); + } + return 1; +} + +static int fspool_is_valid_name(const struct dirent *d) +{ + return fspool_is_valid_name_parse(d->d_name,NULL); +} + +static int fspool_scan(struct mxq_server *server) { + int cnt=0; + int entries; + struct dirent **namelist; + int i; + int res; + + + entries=scandir(server->finished_jobsdir,&namelist,&fspool_is_valid_name,&alphasort); + if (entries<0) { + mx_log_err("scandir %s: %m",server->finished_jobsdir); + return cnt; + } + + for (i=0;ifinished_jobsdir,namelist[i]->d_name); + fspool_is_valid_name_parse(namelist[i]->d_name,&job_id); + res=fspool_process_file(server,filename,job_id); + if (res==0) { + cnt++; + } + free(namelist[i]); + free(filename); + } + + free(namelist); + return cnt; +} + int catchall(struct mxq_server *server) { struct rusage rusage; From 214242aaf8e90dd0dd1435b126697785c277e783 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Fri, 23 Oct 2015 09:31:42 +0200 Subject: [PATCH 07/24] mxqd: add reaper --- mxqd.c | 101 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 99 insertions(+), 2 deletions(-) diff --git a/mxqd.c b/mxqd.c index 95d3512..f5f4565 100644 --- a/mxqd.c +++ b/mxqd.c @@ -1192,6 +1192,98 @@ int user_process(struct mxq_group_list *group,struct mxq_job *mxqjob) return(res); } +int reaper_process(struct mxq_server *server,struct mxq_group_list *group,struct mxq_job *job) { + pid_t pid; + struct rusage rusage; + int status; + pid_t waited_pid; + int waited_status; + struct timeval now; + struct timeval realtime; + _mx_cleanup_free_ char *finished_job_filename=NULL; + _mx_cleanup_free_ char *finished_job_tmpfilename=NULL; + FILE *out; + int res; + + res=prctl(PR_SET_CHILD_SUBREAPER, 1); + if (res<0) { + mx_log_err("set subreaper: %m"); + return res; + } + + pid = fork(); + if (pid < 0) { + mx_log_err("fork: %m"); + return(pid); + } else if (pid == 0) { + res=user_process(group,job); + _exit(EX__MAX+1); + } + gettimeofday(&job->stats_starttime, NULL); + + while (1) { + waited_pid=wait(&waited_status); + if (waited_pid<0) { + if (errno==ECHILD) { + break; + } else { + mx_log_warning("reaper: wait: %m"); + sleep(1); + } + } + if (waited_pid==pid) { + status=waited_status; + } + } + gettimeofday(&now, NULL); + timersub(&now, &job->stats_starttime, &realtime); + res=getrusage(RUSAGE_CHILDREN,&rusage); + if (res<0) { + mx_log_err("reaper: getrusage: %m"); + return(res); + } + + mx_asprintf_forever(&finished_job_filename,"%s/%lu.stat",server->finished_jobsdir,job->job_id); + mx_asprintf_forever(&finished_job_tmpfilename,"%s.tmp",finished_job_filename); + + out=fopen(finished_job_tmpfilename,"w"); + if (!out) { + mx_log_fatal("%s: %m",finished_job_tmpfilename); + return (-errno); + } + + fprintf(out,"1 %d %d %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld\n", + getpid(), + status, + realtime.tv_sec,realtime.tv_usec, + rusage.ru_utime.tv_sec,rusage.ru_utime.tv_usec, + rusage.ru_stime.tv_sec,rusage.ru_stime.tv_usec, + rusage.ru_maxrss, + rusage.ru_ixrss, + rusage.ru_idrss, + rusage.ru_isrss, + rusage.ru_minflt, + rusage.ru_majflt, + rusage.ru_nswap, + rusage.ru_inblock, + rusage.ru_oublock, + rusage.ru_msgsnd, + rusage.ru_msgrcv, + rusage.ru_nsignals, + rusage.ru_nvcsw, + rusage.ru_nivcsw + ); + fflush(out); + fsync(fileno(out)); + fclose(out); + res=rename(finished_job_tmpfilename,finished_job_filename); + if (res<0) { + mx_log_fatal("rename %s: %m",finished_job_tmpfilename); + return(res); + } + return(0); +} + unsigned long start_job(struct mxq_group_list *group) { struct mxq_server *server; @@ -1230,8 +1322,8 @@ unsigned long start_job(struct mxq_group_list *group) group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id, mxqjob.host_pid, getpgrp()); - res=user_process(group,&mxqjob); - exit(res<0 ? 1 : 0); + res=reaper_process(server,group,&mxqjob); + _exit(res<0 ? EX__MAX+1 : 0); } gettimeofday(&mxqjob.stats_starttime, NULL); @@ -1981,6 +2073,7 @@ int catchall(struct mxq_server *server) { mx_log_info(" job=%s(%d):%lu:%lu host_pid=%d stats_status=%d :: child process returned.", g->user_name, g->user_uid, g->group_id, j->job_id, pid, status); + fspool_unlink(server,j->job_id); cnt+=job_has_finished(server,g,job); @@ -2126,6 +2219,8 @@ int main(int argc, char *argv[]) while (!global_sigint_cnt && !global_sigterm_cnt && !fail) { slots_returned = catchall(&server); + slots_returned += fspool_scan(&server); + if (slots_returned) mx_log_info("slots_returned=%lu :: Main Loop freed %lu slots.", slots_returned, slots_returned); @@ -2178,6 +2273,8 @@ int main(int argc, char *argv[]) while (server.jobs_running) { slots_returned = catchall(&server); + slots_returned += fspool_scan(&server); + if (slots_returned) { mx_log_info("jobs_running=%lu slots_returned=%lu global_sigint_cnt=%d global_sigterm_cnt=%d :", server.jobs_running, slots_returned, global_sigint_cnt, global_sigterm_cnt); From 8d76e231eefc5f99c464cf8d6dc4e27a4cb879f0 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Mon, 26 Oct 2015 16:36:41 +0100 Subject: [PATCH 08/24] mxqd: stop recover_from_previous_crash from deleting running jobs --- mxqd.c | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/mxqd.c b/mxqd.c index f5f4565..31cd845 100644 --- a/mxqd.c +++ b/mxqd.c @@ -2112,7 +2112,6 @@ int load_groups(struct mxq_server *server) { int recover_from_previous_crash(struct mxq_server *server) { int res1; - int res2; assert(server); assert(server->mysql); @@ -2128,16 +2127,7 @@ int recover_from_previous_crash(struct mxq_server *server) mx_log_info("hostname=%s server_id=%s :: recovered from previous crash: unassigned %d jobs.", server->hostname, server->server_id, res1); - res2 = mxq_set_job_status_unknown_for_server(server->mysql, server->hostname, server->server_id); - if (res2 < 0) { - mx_log_info("mxq_unassign_jobs_of_server() failed: %m"); - return res2; - } - if (res2 > 0) - mx_log_info("hostname=%s server_id=%s :: recovered from previous crash: set job_status='unknown' for %d jobs.", - server->hostname, server->server_id, res2); - - return res1+res2; + return res1; } /**********************************************************************/ From 7ec150a2edb1fd13f3275d57c9480daf01c37887 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 28 Oct 2015 08:48:24 +0100 Subject: [PATCH 09/24] mxq_job: add mxq_load_jobs_running_on_server --- mxq_job.c | 35 +++++++++++++++++++++++++++++++++++ mxq_job.h | 1 + 2 files changed, 36 insertions(+) diff --git a/mxq_job.c b/mxq_job.c index 4b33fda..0c6f68c 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -704,3 +704,38 @@ int mxq_load_job_from_group_for_server(struct mx_mysql *mysql, struct mxq_job *m return 1; } + +int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, char *hostname, char *server_id) +{ + int res; + *mxq_jobs=NULL; + struct mxq_job j = {0}; + + struct mx_mysql_bind param = {0}; + struct mx_mysql_bind result = {0}; + + char *query = + "SELECT" + JOB_FIELDS + " FROM mxq_job" + " WHERE job_status = " status_str(MXQ_JOB_STATUS_RUNNING) + " AND host_hostname=?" + " AND server_id=?"; + res = mx_mysql_bind_init_param(¶m, 2); + assert(res == 0); + + res=0; + res += mx_mysql_bind_var(¶m, 0, string, &hostname); + res += mx_mysql_bind_var(¶m, 1, string, &server_id); + assert(res == 0); + + res = bind_result_job_fields(&result, &j); + assert(res == 0); + + res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)mxq_jobs, sizeof(**mxq_jobs)); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + return res; +} diff --git a/mxq_job.h b/mxq_job.h index 3bf62bb..df03910 100644 --- a/mxq_job.h +++ b/mxq_job.h @@ -107,5 +107,6 @@ int mxq_set_job_status_unknown_for_server(struct mx_mysql *mysql, char *hostname int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j); int mxq_load_job_from_group_assigned_to_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t group_id, char *hostname, char *server_id); int mxq_load_job_from_group_for_server(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, char *hostname, char *server_id, char *host_id); +int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, char *hostname, char *server_id); #endif From 2c72cac970536137af487753af71bc03ce423c06 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 28 Oct 2015 08:59:32 +0100 Subject: [PATCH 10/24] mxqd: refactor (add reset_signals) --- mxqd.c | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/mxqd.c b/mxqd.c index 31cd845..d484546 100644 --- a/mxqd.c +++ b/mxqd.c @@ -887,6 +887,19 @@ static struct mxq_group_list *server_update_groupdata(struct mxq_server *server, return user_update_groupdata(user, group); } +static void reset_signals() +{ + signal(SIGINT, SIG_DFL); + signal(SIGTERM, SIG_DFL); + signal(SIGQUIT, SIG_DFL); + signal(SIGHUP, SIG_DFL); + signal(SIGTSTP, SIG_DFL); + signal(SIGTTIN, SIG_DFL); + signal(SIGTTOU, SIG_DFL); + signal(SIGCHLD, SIG_DFL); + signal(SIGPIPE, SIG_DFL); +} + static int init_child_process(struct mxq_group_list *group, struct mxq_job *j) { struct mxq_group *g; @@ -905,18 +918,7 @@ static int init_child_process(struct mxq_group_list *group, struct mxq_job *j) s = group->user->server; g = &group->group; - /** restore signal handler **/ - signal(SIGINT, SIG_DFL); - signal(SIGTERM, SIG_DFL); - signal(SIGQUIT, SIG_DFL); - signal(SIGHUP, SIG_DFL); - signal(SIGTSTP, SIG_DFL); - signal(SIGTTIN, SIG_DFL); - signal(SIGTTOU, SIG_DFL); - signal(SIGCHLD, SIG_DFL); - - /* reset SIGPIPE which seems to be ignored by mysqlclientlib (?) */ - signal(SIGPIPE, SIG_DFL); + reset_signals(); /** set sessionid and pgrp leader **/ pid = setsid(); @@ -1205,6 +1207,8 @@ int reaper_process(struct mxq_server *server,struct mxq_group_list *group,struc FILE *out; int res; + reset_signals(); + res=prctl(PR_SET_CHILD_SUBREAPER, 1); if (res<0) { mx_log_err("set subreaper: %m"); From 82e899220a9e88aab20be807e718ac81f2bf133c Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 28 Oct 2015 23:19:43 +0100 Subject: [PATCH 11/24] mxqd: let recover_from_previous_crash rebuild state for previous jobs --- mxqd.c | 272 +++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 227 insertions(+), 45 deletions(-) diff --git a/mxqd.c b/mxqd.c index d484546..861b39f 100644 --- a/mxqd.c +++ b/mxqd.c @@ -652,55 +652,103 @@ void group_init(struct mxq_group_list *group) group->memory_max = memory_max; } -/**********************************************************************/ +static struct mxq_user_list *server_find_user(struct mxq_server *server,uint32_t uid) +{ + struct mxq_user_list *user_list; -struct mxq_job_list *server_remove_job_by_pid(struct mxq_server *server, pid_t pid) + for (user_list=server->users;user_list;user_list=user_list->next) + if (user_list->groups && user_list->groups[0].group.user_uid==uid) { + return user_list; + } + return NULL; +} + +static struct mxq_group_list *server_find_group(struct mxq_server *server,uint64_t group_id) { - struct mxq_user_list *user; - struct mxq_group_list *group; - struct mxq_job_list *job, *prev; + struct mxq_user_list *user_list; + struct mxq_group_list *group_list; - for (user=server->users; user; user=user->next) { - for (group=user->groups; group; group=group->next) { - for (job=group->jobs, prev=NULL; job; prev=job,job=job->next) { - if (job->job.host_pid == pid) { - if (prev) { - prev->next = job->next; - } else { - assert(group->jobs); - assert(group->jobs == job); + for (user_list=server->users;user_list;user_list=user_list->next) + for (group_list=user_list->groups;group_list;group_list=group_list->next) + if (group_list->group.group_id==group_id) + return group_list; + return NULL; +} - group->jobs = job->next; - } +static struct mxq_job_list *server_find_job(struct mxq_server *server,uint64_t job_id) +{ + struct mxq_user_list *user_list; + struct mxq_group_list *group_list; + struct mxq_job_list *job_list; + + for (user_list=server->users;user_list;user_list=user_list->next) + for (group_list=user_list->groups;group_list;group_list=group_list->next) + for (job_list=group_list->jobs;job_list;job_list=job_list->next) + if (job_list->job.job_id==job_id) + return job_list; + return NULL; +} - group->job_cnt--; - user->job_cnt--; - server->job_cnt--; +static struct mxq_job_list *server_find_job_by_pid(struct mxq_server *server,pid_t pid) +{ + struct mxq_user_list *user_list; + struct mxq_group_list *group_list; + struct mxq_job_list *job_list; + + for (user_list=server->users;user_list;user_list=user_list->next) + for (group_list=user_list->groups;group_list;group_list=group_list->next) + for (job_list=group_list->jobs;job_list;job_list=job_list->next) + if (job_list->job.host_pid==pid) + return job_list; + return NULL; +} - group->slots_running -= job->job.host_slots; - user->slots_running -= job->job.host_slots; - server->slots_running -= job->job.host_slots; - group->threads_running -= group->group.job_threads; - user->threads_running -= group->group.job_threads; - server->threads_running -= group->group.job_threads; +void server_remove_job(struct mxq_job_list *job) { + struct mxq_group_list *group=job->group; + struct mxq_user_list *user=group->user; + struct mxq_server *server=user->server; - group->group.group_jobs_running--; + struct mxq_job_list **prev; - group->jobs_running--; - user->jobs_running--; - server->jobs_running--; + for (prev=&group->jobs;*prev;prev=&(*prev)->next) { + if (*prev==job) { + *prev=job->next; + group->job_cnt--; + user->job_cnt--; + server->job_cnt--; - group->memory_used -= group->group.job_memory; - user->memory_used -= group->group.job_memory; - server->memory_used -= group->group.job_memory; + group->slots_running -= job->job.host_slots; + user->slots_running -= job->job.host_slots; + server->slots_running -= job->job.host_slots; - return job; - } - } + group->threads_running -= group->group.job_threads; + user->threads_running -= group->group.job_threads; + server->threads_running -= group->group.job_threads; + + group->group.group_jobs_running--; + + group->jobs_running--; + user->jobs_running--; + server->jobs_running--; + + group->memory_used -= group->group.job_memory; + user->memory_used -= group->group.job_memory; + server->memory_used -= group->group.job_memory; + break; } } - return NULL; +} + +struct mxq_job_list *server_remove_job_by_pid(struct mxq_server *server, pid_t pid) +{ + struct mxq_job_list *job; + + job=server_find_job_by_pid(server,pid); + if (job) { + server_remove_job(job); + } + return job; } /**********************************************************************/ @@ -1999,6 +2047,118 @@ static int fspool_scan(struct mxq_server *server) { return cnt; } +static int file_exists(char *name) { + int res; + struct stat stat_buf; + + res=stat(name,&stat_buf); + if (res<0) { + if (errno==ENOENT) { + return 0; + } else { + mx_log_warning("%s: %m",name); + return 1; + } + } else { + return 1; + } +} + +static int fspool_file_exists(struct mxq_server *server,uint64_t job_id) { + _mx_cleanup_free_ char *fspool_filename=NULL; + fspool_filename=fspool_get_filename(server,job_id); + return file_exists(fspool_filename); +} + +static int lost_scan_one(struct mxq_server *server) +{ + struct mxq_user_list *user_list; + struct mxq_group_list *group_list; + struct mxq_job_list *job_list; + int res; + + for (user_list=server->users;user_list;user_list=user_list->next) + for (group_list=user_list->groups;group_list;group_list=group_list->next) + for (job_list=group_list->jobs;job_list;job_list=job_list->next) { + res=kill(job_list->job.host_pid,0); + if (res<0) { + if (errno==ESRCH) { + if (!fspool_file_exists(server,job_list->job.job_id)) { + mx_log_warning("pid %u: process is gone. cancel job %d",job_list->job.host_pid,job_list->job.job_id); + server_remove_job_by_pid(server, job_list->job.host_pid); + job_list->job.job_status=MXQ_JOB_STATUS_UNKNOWN; + job_has_finished(server,&group_list->group,job_list); + return 1; + } + } else { + return -errno; + } + } + } + return 0; +} + +static int lost_scan(struct mxq_server *server) +{ + int res; + int count=0; + do { + res=lost_scan_one(server); + if (res<0) + return res; + count+=res; + } while (res>0); + return count; +} + + +static int server_reload_running(struct mxq_server *server) +{ + int job_cnt; + struct mxq_job *jobs; + int j; + + struct mxq_job_list *mxq_job_list; + struct mxq_group_list *mxq_group_list; + struct mxq_user_list *mxq_user_list; + + int group_cnt; + + job_cnt=mxq_load_jobs_running_on_server(server->mysql,&jobs,server->hostname,server->server_id); + if (job_cnt<0) + return job_cnt; + for (j=0;jstats_starttime.tv_sec=job->date_start; + + mxq_job_list=server_find_job(server,job->job_id); + if (!mxq_job_list) { + mxq_group_list=server_find_group(server,job->group_id); + if (!mxq_group_list) { + struct mxq_group *groups=NULL; + struct mxq_group *group; + group_cnt=mxq_load_group(server->mysql,&groups,job->group_id); + if (group_cnt!=1) + continue; + group=&groups[0]; + mxq_user_list=server_find_user(server,group->user_uid); + if (!mxq_user_list) { + mxq_group_list=server_add_user(server,group); + } else { + mxq_group_list=user_add_group(mxq_user_list,group); + } + free(groups); + } + mxq_job_list=mxq_group_list->jobs; + } + group_add_job(mxq_group_list,job); + } + + free(jobs); + return job_cnt; +} + int catchall(struct mxq_server *server) { struct rusage rusage; @@ -2115,23 +2275,47 @@ int load_groups(struct mxq_server *server) { int recover_from_previous_crash(struct mxq_server *server) { - int res1; + int res; assert(server); assert(server->mysql); assert(server->hostname); assert(server->server_id); - res1 = mxq_unassign_jobs_of_server(server->mysql, server->hostname, server->server_id); - if (res1 < 0) { + res = mxq_unassign_jobs_of_server(server->mysql, server->hostname, server->server_id); + if (res < 0) { mx_log_info("mxq_unassign_jobs_of_server() failed: %m"); - return res1; + return res; } - if (res1 > 0) + if (res > 0) mx_log_info("hostname=%s server_id=%s :: recovered from previous crash: unassigned %d jobs.", - server->hostname, server->server_id, res1); + server->hostname, server->server_id, res); - return res1; + res=server_reload_running(server); + if (res<0) { + mx_log_err("recover: server_reload_running: %m"); + return res; + } + if (res>0) + mx_log_info("recover: reload %d running jobs from database", res); + + res=fspool_scan(server); + if (res<0) { + mx_log_err("recover: server_fspool_scan: %m"); + return res; + } + if (res>0) + mx_log_info("recover: processed %d finished jobs from fspool",res); + + res=lost_scan(server); + if (res<0) { + mx_log_err("recover: lost_scan: %m"); + return(res); + } + if (res>0) + mx_log_warning("recover: %d jobs vanished from the system",res); + + return 0; } /**********************************************************************/ @@ -2205,8 +2389,6 @@ int main(int argc, char *argv[]) mx_log_warning("recover_from_previous_crash() failed. Aborting execution."); fail = 1; } - if (res > 0) - mx_log_warning("total %d jobs recovered from previous crash.", res); if (server.recoveronly) fail = 1; From a07647dfe17369cc454f5f1e7f51df9e245ea4a8 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 28 Oct 2015 15:47:49 +0100 Subject: [PATCH 12/24] mxqd: add SIGQUIT processing : do not kill or wait for children --- mxqd.c | 49 ++++++++++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/mxqd.c b/mxqd.c index 861b39f..3700147 100644 --- a/mxqd.c +++ b/mxqd.c @@ -53,6 +53,7 @@ volatile sig_atomic_t global_sigint_cnt=0; volatile sig_atomic_t global_sigterm_cnt=0; +volatile sig_atomic_t global_sigquit_cnt=0; int mxq_redirect_output(char *stdout_fname, char *stderr_fname); @@ -2332,6 +2333,11 @@ static void sig_handler(int sig) global_sigterm_cnt++; return; } + + if (sig == SIGQUIT) { + global_sigquit_cnt++; + return; + } } int main(int argc, char *argv[]) @@ -2377,7 +2383,7 @@ int main(int argc, char *argv[]) signal(SIGINT, sig_handler); signal(SIGTERM, sig_handler); - signal(SIGQUIT, SIG_IGN); + signal(SIGQUIT, sig_handler); signal(SIGHUP, SIG_IGN); signal(SIGTSTP, SIG_IGN); signal(SIGTTIN, SIG_IGN); @@ -2393,7 +2399,7 @@ int main(int argc, char *argv[]) if (server.recoveronly) fail = 1; - while (!global_sigint_cnt && !global_sigterm_cnt && !fail) { + while (!global_sigint_cnt && !global_sigterm_cnt && !global_sigquit_cnt && !fail) { slots_returned = catchall(&server); slots_returned += fspool_scan(&server); @@ -2445,28 +2451,29 @@ int main(int argc, char *argv[]) } /*** clean up ***/ - mx_log_info("global_sigint_cnt=%d global_sigterm_cnt=%d : Exiting.", global_sigint_cnt, global_sigterm_cnt); + mx_log_info("global_sigint_cnt=%d global_sigterm_cnt=%d global_sigquit_cnt=%d: Exiting.", global_sigint_cnt, global_sigterm_cnt,global_sigquit_cnt); - while (server.jobs_running) { - slots_returned = catchall(&server); - slots_returned += fspool_scan(&server); + if (global_sigterm_cnt||global_sigint_cnt) { + while (server.jobs_running) { + slots_returned = catchall(&server); + slots_returned += fspool_scan(&server); - if (slots_returned) { - mx_log_info("jobs_running=%lu slots_returned=%lu global_sigint_cnt=%d global_sigterm_cnt=%d :", - server.jobs_running, slots_returned, global_sigint_cnt, global_sigterm_cnt); - continue; + if (slots_returned) { + mx_log_info("jobs_running=%lu slots_returned=%lu global_sigint_cnt=%d global_sigterm_cnt=%d :", + server.jobs_running, slots_returned, global_sigint_cnt, global_sigterm_cnt); + continue; + } + if (global_sigint_cnt) + killall(&server, SIGTERM, 1); + + killallcancelled(&server, SIGTERM, 0); + killallcancelled(&server, SIGINT, 0); + killall_over_time(&server); + killall_over_memory(&server); + mx_log_info("jobs_running=%lu global_sigint_cnt=%d global_sigterm_cnt=%d : Exiting. Wating for jobs to finish. Sleeping for a while.", + server.jobs_running, global_sigint_cnt, global_sigterm_cnt); + sleep(1); } - if (global_sigint_cnt) - killall(&server, SIGTERM, 0); - - killallcancelled(&server, SIGTERM, 0); - killallcancelled(&server, SIGINT, 0); - killall_over_time(&server); - killall_over_memory(&server); - - mx_log_info("jobs_running=%lu global_sigint_cnt=%d global_sigterm_cnt=%d : Exiting. Wating for jobs to finish. Sleeping for a while.", - server.jobs_running, global_sigint_cnt, global_sigterm_cnt); - sleep(1); } mx_mysql_finish(&(server.mysql)); From 272f6fc6716ea833cfe88ed9f5c3208e2a4afeb4 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 28 Oct 2015 12:03:44 +0100 Subject: [PATCH 13/24] mxq_job: add mxq_set_job_status_unknown --- mxq_job.c | 23 +++++++++++++++++++++++ mxq_job.h | 1 + 2 files changed, 24 insertions(+) diff --git a/mxq_job.c b/mxq_job.c index 0c6f68c..9a2f2f8 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -573,6 +573,29 @@ int mxq_set_job_status_unknown_for_server(struct mx_mysql *mysql, char *hostname return res; } +int mxq_set_job_status_unknown(struct mx_mysql *mysql, struct mxq_job *job) +{ + int res; + struct mx_mysql_bind param = {0}; + + char *query = + "UPDATE mxq_job SET" + " job_status = " status_str(MXQ_JOB_STATUS_UNKNOWN) + " WHERE job_id = ?"; + + res = mx_mysql_bind_init_param(¶m, 1); + res += mx_mysql_bind_var(¶m, 0, uint64, &job->job_id); + assert(res == 0); + + res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, ¶m); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + + return res; +} + int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j) { if (!mx_streq(j->job_stdout, "/dev/null")) { diff --git a/mxq_job.h b/mxq_job.h index df03910..52259ca 100644 --- a/mxq_job.h +++ b/mxq_job.h @@ -103,6 +103,7 @@ int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, char *hostname, char *se int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job); int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job); int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job); +int mxq_set_job_status_unknown(struct mx_mysql *mysql, struct mxq_job *job); int mxq_set_job_status_unknown_for_server(struct mx_mysql *mysql, char *hostname, char *server_id); int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j); int mxq_load_job_from_group_assigned_to_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t group_id, char *hostname, char *server_id); From c6bd46fbd25f0e0611b1185b7ecc7efbb776ec5a Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 28 Oct 2015 12:50:46 +0100 Subject: [PATCH 14/24] mxqd: do not kill children in catchall --- mxqd.c | 6 ------ 1 file changed, 6 deletions(-) diff --git a/mxqd.c b/mxqd.c index 3700147..9c20b0e 100644 --- a/mxqd.c +++ b/mxqd.c @@ -2202,12 +2202,6 @@ int catchall(struct mxq_server *server) { mx_log_err("FIX ME BUG!!! pid=%d errno=%d (%m)", pid, errno); continue; } - /* valid job returned.. */ - - /* kill possible leftovers with SIGKILL */ - res = kill(-siginfo.si_pid, SIGKILL); - if (res == -1) - mx_log_err("kill process group pgrp=%d failed: %m", -siginfo.si_pid); /* reap child and save new state */ pid = wait4(siginfo.si_pid, &status, WNOHANG, &rusage); From 274bfd33211d9c064acf7a91606ac2095825c0d1 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 28 Oct 2015 12:53:15 +0100 Subject: [PATCH 15/24] mxqd: add job_is_lost --- mxqd.c | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/mxqd.c b/mxqd.c index 9c20b0e..501973b 100644 --- a/mxqd.c +++ b/mxqd.c @@ -1915,6 +1915,40 @@ static int job_has_finished (struct mxq_server *server,struct mxq_group *g,struc return cnt; } +static int job_is_lost (struct mxq_server *server,struct mxq_group *g,struct mxq_job_list *job) +{ + int cnt=0; + int res; + struct mxq_job *j=&job->job; + + mxq_set_job_status_unknown(server->mysql, j); + g->group_jobs_unknown++; + + mxq_job_set_tmpfilenames(g, j); + + if (!mx_streq(j->job_stdout, "/dev/null")) { + res = rename(j->tmp_stdout, j->job_stdout); + if (res == -1) { + mx_log_err(" job=%s(%d):%lu:%lu host_pid=%d :: rename(stdout) failed: %m", + g->user_name, g->user_uid, g->group_id, j->job_id, j->host_pid); + } + } + + if (!mx_streq(j->job_stderr, "/dev/null") && !mx_streq(j->job_stderr, j->job_stdout)) { + res = rename(j->tmp_stderr, j->job_stderr); + if (res == -1) { + mx_log_err(" job=%s(%d):%lu:%lu host_pid=%d :: rename(stderr) failed: %m", + g->user_name, g->user_uid, g->group_id, j->job_id, j->host_pid); + } + } + + cnt += job->group->slots_per_job; + cpuset_clear_running(&server->cpu_set_running,&j->host_cpu_set); + mxq_job_free_content(j); + free(job); + return cnt; +} + static char *fspool_get_filename (struct mxq_server *server,long unsigned int job_id) { char *fspool_filename; @@ -2088,7 +2122,7 @@ static int lost_scan_one(struct mxq_server *server) mx_log_warning("pid %u: process is gone. cancel job %d",job_list->job.host_pid,job_list->job.job_id); server_remove_job_by_pid(server, job_list->job.host_pid); job_list->job.job_status=MXQ_JOB_STATUS_UNKNOWN; - job_has_finished(server,&group_list->group,job_list); + job_is_lost(server,&group_list->group,job_list); return 1; } } else { From e3db7214657e8cba68205ab001e63d9f21335eb2 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 28 Oct 2015 13:52:39 +0100 Subject: [PATCH 16/24] mxqd: let reaper call setsid instread of user process --- mxqd.c | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/mxqd.c b/mxqd.c index 501973b..ca03aa7 100644 --- a/mxqd.c +++ b/mxqd.c @@ -954,7 +954,6 @@ static int init_child_process(struct mxq_group_list *group, struct mxq_job *j) struct mxq_group *g; struct mxq_server *s; struct passwd *passwd; - pid_t pid; int res; int fh; struct rlimit rlim; @@ -969,13 +968,6 @@ static int init_child_process(struct mxq_group_list *group, struct mxq_job *j) reset_signals(); - /** set sessionid and pgrp leader **/ - pid = setsid(); - if (pid == -1) { - mx_log_err("job=%s(%d):%lu:%lu setsid(): %m", - g->user_name, g->user_uid, g->group_id, j->job_id); - } - passwd = getpwuid(g->user_uid); if (!passwd) { mx_log_err("job=%s(%d):%lu:%lu getpwuid(): %m", @@ -1258,6 +1250,11 @@ int reaper_process(struct mxq_server *server,struct mxq_group_list *group,struc reset_signals(); + res = setsid(); + if (res<0) { + mx_log_warning("reaper_process setsid: %m"); + } + res=prctl(PR_SET_CHILD_SUBREAPER, 1); if (res<0) { mx_log_err("set subreaper: %m"); From 4a28bf6fde2fa39c32326dd9d51e896fb6bd203e Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 28 Oct 2015 14:35:53 +0100 Subject: [PATCH 17/24] mxqd: remove unused member --- mxqd.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/mxqd.h b/mxqd.h index 1259ddd..7e56c61 100644 --- a/mxqd.h +++ b/mxqd.h @@ -11,8 +11,6 @@ struct mxq_job_list { struct mxq_job job; unsigned long long int max_sum_rss; - - pid_t pid; }; struct mxq_group_list { From fe360f47b780d2f54ed7cd08e11416c1969d332c Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 28 Oct 2015 15:02:59 +0100 Subject: [PATCH 18/24] mxqd: better loglevels for killall_over_time --- mxqd.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mxqd.c b/mxqd.c index ca03aa7..6e48925 100644 --- a/mxqd.c +++ b/mxqd.c @@ -1716,7 +1716,7 @@ int killall_over_time(struct mxq_server *server) /* limit killing to every >= 60 seconds */ mx_within_rate_limit_or_return(60, 1); - mx_log_info("killall_over_time: Sending signals to all jobs running longer than requested."); + mx_log_debug("killall_over_time: Sending signals to all jobs running longer than requested."); gettimeofday(&now, NULL); @@ -1731,13 +1731,13 @@ int killall_over_time(struct mxq_server *server) pid = job->job.host_pid; if (delta.tv_sec <= group->group.job_time*61) { - mx_log_debug("killall_over_time(): Sending signal=XCPU to job=%s(%d):%lu:%lu pid=%d", + mx_log_info("killall_over_time(): Sending signal=XCPU to job=%s(%d):%lu:%lu pid=%d", group->group.user_name, group->group.user_uid, group->group.group_id, job->job.job_id, pid); kill(pid, SIGXCPU); continue; } - mx_log_debug("killall_over_time(): Sending signal=XCPU to job=%s(%d):%lu:%lu pgrp=%d", + mx_log_info("killall_over_time(): Sending signal=XCPU to job=%s(%d):%lu:%lu pgrp=%d", group->group.user_name, group->group.user_uid, group->group.group_id, job->job.job_id, pid); kill(-pid, SIGXCPU); From 29d49c751524108b1094e6fe3d355ade9fbaf0fc Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 28 Oct 2015 23:26:16 +0100 Subject: [PATCH 19/24] mxqd: do not finish jobs from signals when we have reaper output --- mxqd.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/mxqd.c b/mxqd.c index 6e48925..31e7062 100644 --- a/mxqd.c +++ b/mxqd.c @@ -2223,7 +2223,7 @@ int catchall(struct mxq_server *server) { assert(siginfo.si_pid > 1); - job = server_remove_job_by_pid(server, siginfo.si_pid); + job = server_find_job_by_pid(server,siginfo.si_pid); if (!job) { mx_log_warning("unknown pid returned.. si_pid=%d si_uid=%d si_code=%d si_status=%d getpgid(si_pid)=%d getsid(si_pid)=%d", siginfo.si_pid, siginfo.si_uid, siginfo.si_code, siginfo.si_status, @@ -2233,6 +2233,12 @@ int catchall(struct mxq_server *server) { mx_log_err("FIX ME BUG!!! pid=%d errno=%d (%m)", pid, errno); continue; } + if (fspool_file_exists(server,job->job.job_id)) { + waitpid(siginfo.si_pid, &status, WNOHANG); + continue; + } + mx_log_err("reaper died. status=%d. Cleaning up job from catchall.",status); + server_remove_job(job); /* reap child and save new state */ pid = wait4(siginfo.si_pid, &status, WNOHANG, &rusage); From 61ddd6f4ca0096a28f70b7bf3a40a6f248a5e6d2 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Thu, 29 Oct 2015 16:43:24 +0100 Subject: [PATCH 20/24] mxq_job: refactor (add do_jobs_statement) --- mxq_job.c | 64 ++++++++++++++++++++++++------------------------------- 1 file changed, 28 insertions(+), 36 deletions(-) diff --git a/mxq_job.c b/mxq_job.c index 9a2f2f8..3e8a3e8 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -12,6 +12,7 @@ #include "mx_util.h" #include "mx_log.h" +#include "mx_util.h" #include "mxq_group.h" #include "mxq_job.h" @@ -164,13 +165,28 @@ void mxq_job_free_content(struct mxq_job *j) j->job_argv = NULL; } +static int do_jobs_statement(struct mx_mysql *mysql, char *query, struct mx_mysql_bind *param, struct mxq_job **jobs) +{ + int res; + struct mxq_job j = {0}; + struct mx_mysql_bind result = {0}; + + res = bind_result_job_fields(&result, &j); + assert(res == 0); + + res = mx_mysql_do_statement(mysql, query, param, &result, &j, (void **)jobs, sizeof(**jobs)); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + return res; +} + int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job_id) { int res; struct mxq_job *jobs = NULL; - struct mxq_job j = {0}; struct mx_mysql_bind param = {0}; - struct mx_mysql_bind result = {0}; assert(mysql); assert(mxq_jobs); @@ -189,12 +205,8 @@ int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job res = mx_mysql_bind_var(¶m, 0, uint64, &job_id); assert(res == 0); - res = bind_result_job_fields(&result, &j); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); + res=do_jobs_statement(mysql, query, ¶m, &jobs); if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); return res; } @@ -202,13 +214,13 @@ int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job return res; } + + int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp) { int res; struct mxq_job *jobs = NULL; - struct mxq_job j = {0}; struct mx_mysql_bind param = {0}; - struct mx_mysql_bind result = {0}; assert(mysql); assert(mxq_jobs); @@ -227,12 +239,8 @@ int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, st res = mx_mysql_bind_var(¶m, 0, uint64, &(grp->group_id)); assert(res == 0); - res = bind_result_job_fields(&result, &j); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); + res=do_jobs_statement(mysql, query, ¶m, &jobs); if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); return res; } @@ -244,9 +252,7 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job ** { int res; struct mxq_job *jobs = NULL; - struct mxq_job j = {0}; struct mx_mysql_bind param = {0}; - struct mx_mysql_bind result = {0}; assert(mysql); assert(mxq_jobs); @@ -268,12 +274,8 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job ** res += mx_mysql_bind_var(¶m, 1, uint64, &job_status); assert(res == 0); - res = bind_result_job_fields(&result, &j); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); + res=do_jobs_statement(mysql, query, ¶m, &jobs); if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); return res; } @@ -628,9 +630,7 @@ int mxq_load_job_from_group_assigned_to_server(struct mx_mysql *mysql, struct mx { int res; struct mxq_job *jobs = NULL; - struct mxq_job j = {0}; struct mx_mysql_bind param = {0}; - struct mx_mysql_bind result = {0}; assert(mysql); assert(mxq_jobs); @@ -659,12 +659,8 @@ int mxq_load_job_from_group_assigned_to_server(struct mx_mysql *mysql, struct mx res += mx_mysql_bind_var(¶m, 2, uint64, &group_id); assert(res == 0); - res = bind_result_job_fields(&result, &j); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); + res=do_jobs_statement(mysql, query, ¶m, &jobs); if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); return res; } @@ -731,11 +727,9 @@ int mxq_load_job_from_group_for_server(struct mx_mysql *mysql, struct mxq_job *m int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, char *hostname, char *server_id) { int res; - *mxq_jobs=NULL; - struct mxq_job j = {0}; + struct mxq_job *jobs = NULL; struct mx_mysql_bind param = {0}; - struct mx_mysql_bind result = {0}; char *query = "SELECT" @@ -752,13 +746,11 @@ int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **mxq res += mx_mysql_bind_var(¶m, 1, string, &server_id); assert(res == 0); - res = bind_result_job_fields(&result, &j); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)mxq_jobs, sizeof(**mxq_jobs)); + res=do_jobs_statement(mysql, query, ¶m, &jobs); if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); return res; } + + *mxq_jobs = jobs; return res; } From 7dee7ef92a8b7c14ebb9f33a546bc64a7b080684 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Thu, 29 Oct 2015 16:45:22 +0100 Subject: [PATCH 21/24] mxq_job: add a string version of host_cpu_set --- mxq_job.h | 1 + mxqd.c | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/mxq_job.h b/mxq_job.h index 52259ca..921a484 100644 --- a/mxq_job.h +++ b/mxq_job.h @@ -45,6 +45,7 @@ struct mxq_job { uint32_t host_pid; uint32_t host_slots; cpu_set_t host_cpu_set; + char * host_cpu_set_str; int64_t date_submit; int64_t date_start; diff --git a/mxqd.c b/mxqd.c index 31e7062..261bf7f 100644 --- a/mxqd.c +++ b/mxqd.c @@ -1357,7 +1357,8 @@ unsigned long start_job(struct mxq_group_list *group) group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id); cpuset_init_job(&mxqjob.host_cpu_set,&server->cpu_set_available,&server->cpu_set_running,group->slots_per_job); - cpuset_log(" job assigned cpus: ",&mxqjob.host_cpu_set); + mxqjob.host_cpu_set_str=mx_cpuset_to_str(&mxqjob.host_cpu_set); + mx_log_info("job assigned cpus: [%s]",mxqjob.host_cpu_set_str); mx_mysql_disconnect(server->mysql); From 35af203b8568a5f1c6fc6932b76366a820bfa90c Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Thu, 29 Oct 2015 16:47:28 +0100 Subject: [PATCH 22/24] database: store and retrieve cpuset of job --- mxq_job.c | 21 ++++++++++++++------- mysql/alter_tables_0.18.2.sql | 5 +++++ mysql/create_tables.sql | 1 + web/pages/mxq/mxq.in | 1 + 4 files changed, 21 insertions(+), 7 deletions(-) create mode 100644 mysql/alter_tables_0.18.2.sql diff --git a/mxq_job.c b/mxq_job.c index 3e8a3e8..d0e930e 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -17,7 +17,7 @@ #include "mxq_group.h" #include "mxq_job.h" -#define JOB_FIELDS_CNT 36 +#define JOB_FIELDS_CNT 37 #define JOB_FIELDS \ " job_id, " \ " job_status, " \ @@ -36,6 +36,7 @@ " host_hostname, " \ " host_pid, " \ " host_slots, " \ + " host_cpu_set, " \ " UNIX_TIMESTAMP(date_submit) as date_submit, " \ " UNIX_TIMESTAMP(date_start) as date_start, " \ " UNIX_TIMESTAMP(date_end) as date_end, " \ @@ -81,6 +82,7 @@ static int bind_result_job_fields(struct mx_mysql_bind *result, struct mxq_job * res += mx_mysql_bind_var(result, idx++, string, &(j->host_hostname)); res += mx_mysql_bind_var(result, idx++, uint32, &(j->host_pid)); res += mx_mysql_bind_var(result, idx++, uint32, &(j->host_slots)); + res += mx_mysql_bind_var(result, idx++, string, &(j->host_cpu_set_str)); res += mx_mysql_bind_var(result, idx++, int64, &(j->date_submit)); res += mx_mysql_bind_var(result, idx++, int64, &(j->date_start)); res += mx_mysql_bind_var(result, idx++, int64, &(j->date_end)); @@ -159,6 +161,7 @@ void mxq_job_free_content(struct mxq_job *j) mx_free_null(j->tmp_stderr); mx_free_null(j->host_submit); mx_free_null(j->host_id); + mx_free_null(j->host_cpu_set_str); mx_free_null(j->server_id); mx_free_null(j->host_hostname); mx_free_null(j->job_argv); @@ -167,7 +170,7 @@ void mxq_job_free_content(struct mxq_job *j) static int do_jobs_statement(struct mx_mysql *mysql, char *query, struct mx_mysql_bind *param, struct mxq_job **jobs) { - int res; + int res,i; struct mxq_job j = {0}; struct mx_mysql_bind result = {0}; @@ -179,6 +182,8 @@ static int do_jobs_statement(struct mx_mysql *mysql, char *query, struct mx_mysq mx_log_err("mx_mysql_do_statement(): %m"); return res; } + for (i=0;ihost_pid)); res += mx_mysql_bind_var(¶m, 1, uint32, &(job->host_slots)); - res += mx_mysql_bind_var(¶m, 2, uint64, &(job->job_id)); - res += mx_mysql_bind_var(¶m, 3, string, &(job->host_hostname)); - res += mx_mysql_bind_var(¶m, 4, string, &(job->server_id)); + res += mx_mysql_bind_var(¶m, 2, string, &(job->host_cpu_set_str)); + res += mx_mysql_bind_var(¶m, 3, uint64, &(job->job_id)); + res += mx_mysql_bind_var(¶m, 4, string, &(job->host_hostname)); + res += mx_mysql_bind_var(¶m, 5, string, &(job->server_id)); assert(res == 0); res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, ¶m); diff --git a/mysql/alter_tables_0.18.2.sql b/mysql/alter_tables_0.18.2.sql new file mode 100644 index 0000000..1be5d73 --- /dev/null +++ b/mysql/alter_tables_0.18.2.sql @@ -0,0 +1,5 @@ +ALTER TABLE mxq_job + ADD COLUMN + host_cpu_set VARCHAR(4095) NOT NULL DEFAULT "" + AFTER + host_slots; diff --git a/mysql/create_tables.sql b/mysql/create_tables.sql index f2566cc..f589f10 100644 --- a/mysql/create_tables.sql +++ b/mysql/create_tables.sql @@ -87,6 +87,7 @@ CREATE TABLE IF NOT EXISTS mxq_job ( host_hostname VARCHAR(64) NOT NULL DEFAULT "", host_pid INT4 UNSIGNED NOT NULL DEFAULT 0, host_slots INT4 UNSIGNED NOT NULL DEFAULT 0, + host_cpu_set VARCHAR(4095) NOT NULL DEFAULT "", date_submit TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, date_start TIMESTAMP NOT NULL DEFAULT 0, diff --git a/web/pages/mxq/mxq.in b/web/pages/mxq/mxq.in index 422c14b..0db5c21 100755 --- a/web/pages/mxq/mxq.in +++ b/web/pages/mxq/mxq.in @@ -356,6 +356,7 @@ host_id : $o{host_id} host_hostname : $o{host_hostname} host_pid : $o{host_pid} host_slots : $o{host_slots} +host_cpu_set : $o{host_cpu_set} date_submit : $o{date_submit} date_start : $o{date_start} $ago From 78bec97b84961e737b4e290851c80c06bcb54b57 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Fri, 30 Oct 2015 10:27:36 +0100 Subject: [PATCH 23/24] mxqd: set cpu_set_running in group_add_job so it is applied to reloaded jobs as well --- mxqd.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mxqd.c b/mxqd.c index 261bf7f..276dc68 100644 --- a/mxqd.c +++ b/mxqd.c @@ -126,7 +126,6 @@ static void cpuset_init_job(cpu_set_t *job_cpu_set,cpu_set_t *available,cpu_set_ for (cpu=CPU_SETSIZE-1;slots&&cpu>=0;cpu--) { if (CPU_ISSET(cpu,available) && !CPU_ISSET(cpu,running)) { CPU_SET(cpu,job_cpu_set); - CPU_SET(cpu,running); slots--; } } @@ -828,6 +827,8 @@ struct mxq_job_list *group_add_job(struct mxq_group_list *group, struct mxq_job user->threads_running += mxqgrp->job_threads; server->threads_running += mxqgrp->job_threads; + CPU_OR(&server->cpu_set_running,&server->cpu_set_running,&j->job.host_cpu_set); + mxqgrp->group_jobs_running++; mxqgrp->group_jobs_inq--; From f23144ea74a984e81fed169191d8fbd8fdb830d8 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Fri, 30 Oct 2015 11:57:12 +0100 Subject: [PATCH 24/24] mxqd: reaper: ignore signals from mxqd --- mxqd.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mxqd.c b/mxqd.c index 276dc68..6ebc9c7 100644 --- a/mxqd.c +++ b/mxqd.c @@ -1251,6 +1251,11 @@ int reaper_process(struct mxq_server *server,struct mxq_group_list *group,struc reset_signals(); + signal(SIGINT, SIG_IGN); + signal(SIGTERM, SIG_IGN); + signal(SIGHUP, SIG_IGN); + signal(SIGXCPU, SIG_IGN); + res = setsid(); if (res<0) { mx_log_warning("reaper_process setsid: %m");