diff --git a/.gitignore b/.gitignore index 61b51d6d..7a5de194 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,7 @@ parser.tab.o test_parser.o test_parser ppidcache.o +mxq_reaper.o mxqsub /mxqsub.1 @@ -44,6 +45,7 @@ test_mx_log test_mx_mysq test_mxqd_control test_keywordset +mxq_reaper /web/pages/mxq/mxq web/lighttpd.conf diff --git a/Makefile b/Makefile index d9ec74e5..8fab4f6b 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ MXQ_VERSION_MAJOR = 0 MXQ_VERSION_MINOR = 30 -MXQ_VERSION_PATCH = 5 +MXQ_VERSION_PATCH = 6 MXQ_VERSION_EXTRA = "beta" MXQ_VERSIONDATE = 2022 @@ -122,7 +122,7 @@ CFLAGS_MYSQL += -DMX_MYSQL_FAIL_WAIT_DEFAULT=5 CFLAGS += -g CFLAGS += -O3 -CFLAGS += -Wall +CFLAGS += -Wall -Wextra -Wno-override-init CFLAGS += -DMXQ_VERSION=\"${MXQ_VERSION}\" CFLAGS += -DMXQ_VERSIONFULL=\"${MXQ_VERSIONFULL}\" CFLAGS += -DMXQ_VERSIONDATE=\"${MXQ_VERSIONDATE}\" @@ -289,6 +289,13 @@ install:: ######################################################################## +.PHONY: scan-build + +scan-build:: + scan-build $(MAKE) build + +######################################################################## + ### mx_log.h ----------------------------------------------------------- mx_log.h += mx_log.h @@ -499,7 +506,6 @@ mxqd.o: CFLAGS += $(CFLAGS_MYSQL) mxqd.o: CFLAGS += $(CFLAGS_MXQ_INITIAL_PATH) mxqd.o: CFLAGS += $(CFLAGS_MXQ_INITIAL_TMPDIR) mxqd.o: CFLAGS += $(CFLAGS_MXQ_FINISHED_JOBSDIR) -mxqd.o: CFLAGS += -Wno-unused-but-set-variable clean: CLEAN += mxqd.o @@ -587,7 +593,6 @@ mxqdump: mxq_job.o mxqdump: mx_util.o mxqdump: mx_getopt.o mxqdump: LDLIBS += $(LDLIBS_MYSQL) -mxqdump: CFLAGS += -Wunused-function build: mxqdump @@ -656,10 +661,17 @@ build: mxqps clean: CLEAN += mxqps +### mxqps ------------------------------------------------------------- + +clean: CLEAN += mxq_reaper.o mxq_reaper +build: mxq_reaper +install:: mxq_reaper + $(call quiet-install,0755,$^,${DESTDIR}${LIBEXECDIR}/mxq/mxq_reaper) + ### script helper ----------------------------------------------------- -install:: helper/create_job_tmpdir - $(call quiet-install,0755,$^,${DESTDIR}${LIBEXECDIR}/mxq/create_job_tmpdir) +install:: helper/tmpdir-setup + $(call quiet-install,0755,$^,${DESTDIR}${LIBEXECDIR}/mxq/tmpdir-setup) install:: helper/gpu-setup $(call quiet-install,0755,$^,${DESTDIR}${LIBEXECDIR}/mxq/gpu-setup) diff --git a/helper/create_job_tmpdir b/helper/create_job_tmpdir index 3d74e3e1..01207427 100755 --- a/helper/create_job_tmpdir +++ b/helper/create_job_tmpdir @@ -37,6 +37,6 @@ if fallocate -l ${MXQ_SIZE}G $filename; then fi rm $filename else - test -e $fileame && rm $filename + test -e $filename && rm $filename fi exit $status diff --git a/helper/tmpdir-setup b/helper/tmpdir-setup new file mode 100755 index 00000000..fe47dc79 --- /dev/null +++ b/helper/tmpdir-setup @@ -0,0 +1,76 @@ +#! /usr/bin/bash + +usage() { + cat <&2 +usage: + $0 create JOBID SIZE UID # create SIZE GiB /dev/shm/mxqd/mnt/job/$JOBID + $0 cleanup JOBID # cleanup +EOF + exit 1 +} + +tmpdir=/scratch/local2/mxqd/tmp +mntdir=/dev/shm/mxqd/mnt/job + +cmd_create() { + (( $# == 3 )) || usage + MXQ_JOBID=$1 + MXQ_SIZE=$2 + MXQ_UID=$3 + + filename=$tmpdir/$MXQ_JOBID.tmp + mountpoint=$mntdir/$MXQ_JOBID + + umask 006 + mkdir -p $tmpdir + mkdir -p $mntdir + + status=1; + + if fallocate -l ${MXQ_SIZE}G $filename; then + if loopdevice=$(losetup --find --show $filename); then + if mkfs.ext4 \ + -q \ + -m 0 \ + -E nodiscard,mmp_update_interval=300,lazy_journal_init=1,root_owner=$MXQ_UID:0 \ + -O '64bit,ext_attr,filetype,^has_journal,huge_file,inline_data,^mmp,^quota,sparse_super2' \ + $loopdevice \ + && mkdir -p $mountpoint && mount -Odata=writeback,barrier=0 $loopdevice $mountpoint; then + rmdir $mountpoint/lost+found + status=0 + fi + losetup -d $loopdevice + fi + rm $filename + else + test -e $filename && rm $filename + fi + exit $status +} + +cmd_cleanup() { + (( $# == 1 )) || usage + MXQ_JOBID=$1 + + ( + shopt -s dotglob; + rm -rf /dev/shm/mxqd/mnt/job/$MXQ_JOBID/* + umount /dev/shm/mxqd/mnt/job/$MXQ_JOBID + rmdir /dev/shm/mxqd/mnt/job/$MXQ_JOBID + ) & +} + +(( $# > 0 )) || usage +cmd="$1" +shift; +case "$cmd" in + create) + cmd_create "$@" + ;; + cleanup) + cmd_cleanup "$@" + ;; + *) + usage + ;; +esac diff --git a/keywordset.c b/keywordset.c index 1ce14df3..e9d6da95 100644 --- a/keywordset.c +++ b/keywordset.c @@ -15,7 +15,7 @@ struct keywordset { static int find_name(struct keywordset *kws, char *name, size_t len) { int i; - int j; + size_t j; for ( i = 0; i < kws->used ; i++ ) { j = 0; while(1) { diff --git a/mx_mysql.c b/mx_mysql.c index ac066671..87e4789b 100644 --- a/mx_mysql.c +++ b/mx_mysql.c @@ -699,11 +699,9 @@ static int _mx_mysql_bind_string(struct mx_mysql_bind *b, unsigned int index, ch static int _mx_mysql_bind_validate(struct mx_mysql_bind *b) { - int i; - mx_assert_return_minus_errno(b, EINVAL); - for (i=0; i < b->count; i++) { + for (unsigned long i=0; i < b->count; i++) { if (!(b->data[i].flags)) { return -(errno=EBADSLT); } @@ -947,6 +945,7 @@ static int mx_mysql_statement_init(struct mx_mysql *mysql, struct mx_mysql_stmt s = mx_calloc_forever(1, sizeof(*s)); s->mysql = mysql; + s->func = ""; do { res = mx__mysql_stmt_init(s); @@ -1015,7 +1014,6 @@ int mx_mysql_statement_fetch(struct mx_mysql_stmt *stmt) { struct mx_mysql_bind *r; int res; - int col; char *str; int no_error = 1; @@ -1044,7 +1042,7 @@ int mx_mysql_statement_fetch(struct mx_mysql_stmt *stmt) } r = &stmt->result; - for (col = 0; col < r->count; col++) { + for (unsigned long col = 0; col < r->count; col++) { if (r->bind[col].buffer_type == MYSQL_TYPE_STRING) { str = mx_calloc_forever(r->data[col].length + 1, sizeof(*str)); @@ -1063,7 +1061,7 @@ int mx_mysql_statement_fetch(struct mx_mysql_stmt *stmt) if (!(r->data[col].is_error)) continue; - mx_log_debug("WARNING: result data returned in column with index %d was truncated. query was:", col); + mx_log_debug("WARNING: result data returned in column with index %lu was truncated. query was:", col); mx_log_debug(" \\ %s", stmt->statement); no_error = 0; } @@ -1165,7 +1163,6 @@ static int _mx_mysql_do_statement(struct mx_mysql *mysql, char *query, struct mx struct mx_mysql_stmt *stmt = NULL; unsigned long long num_rows = 0; int res; - int cnt = 0; char *tmpdata; assert(mysql); @@ -1196,7 +1193,7 @@ static int _mx_mysql_do_statement(struct mx_mysql *mysql, char *query, struct mx if (result && result->count && num_rows) { tmpdata = mx_calloc_forever(num_rows, size); - for (cnt = 0; cnt < num_rows; cnt++) { + for (unsigned long cnt = 0; cnt < num_rows; cnt++) { res = mx_mysql_statement_fetch(stmt); if (res < 0) { mx_log_err("mx_mysql_statement_fetch(): %m"); diff --git a/mx_proc.c b/mx_proc.c index 48e590cc..72330df4 100644 --- a/mx_proc.c +++ b/mx_proc.c @@ -10,15 +10,14 @@ #include "mx_proc.h" static long long int get_rss_anon(pid_t pid) { - _mx_cleanup_free_ char *fname; - mx_asprintf_forever(&fname, "/proc/%d/status", pid); + _mx_cleanup_free_ char *fname = mx_asprintf_forever("/proc/%d/status", pid); _mx_cleanup_fclose_ FILE *file = fopen(fname, "r"); if (file == NULL) return -errno; _mx_cleanup_free_ char *buf = NULL; size_t n = 0; while(1) { - size_t len = getline(&buf, &n, file); + ssize_t len = getline(&buf, &n, file); if (len == -1) break; if (strncmp(buf, "RssAnon:", 8) == 0) { @@ -131,7 +130,6 @@ int mx_proc_pid_stat(struct mx_proc_pid_stat **pps, pid_t pid) int mx_proc_pid_stat_read(struct mx_proc_pid_stat *pps, char *fmt, ...) { - _mx_cleanup_free_ char *fname = NULL; _mx_cleanup_free_ char *line = NULL; va_list ap; int res; @@ -139,7 +137,7 @@ int mx_proc_pid_stat_read(struct mx_proc_pid_stat *pps, char *fmt, ...) assert(pps); va_start(ap, fmt); - mx_vasprintf_forever(&fname, fmt, ap); + _mx_cleanup_free_ char *fname = fname = mx_vasprintf_forever(fmt, ap); va_end(ap); res = mx_read_first_line_from_file(fname, &line); diff --git a/mx_util.c b/mx_util.c index 66a51f28..bc472ab0 100644 --- a/mx_util.c +++ b/mx_util.c @@ -112,17 +112,17 @@ int mx_strtobytes(char *str, unsigned long long int *bytes) case 'T': /* tebi */ t *= 1024; - + // fall through case 'G': /* gibi */ t *= 1024; - + // fall through case 'M': /* mebi */ t *= 1024; - + // fall through case 'k': /* kibi */ case 'K': t *= 1024; - + // fall through case 'B': /* bytes */ end++; break; @@ -180,19 +180,19 @@ int mx_strtoseconds(char *str, unsigned long long int *seconds) case 'y': /* years */ t *= 52; - + // fall through case 'w': /* weeks */ t *= 7; - + // fall through case 'd': /* days */ t *= 24; - + // fall through case 'h': /* hours */ t *= 60; - + // fall through case 'm': /* minutes */ t *= 60; - + // fall through case 's': /* seconds */ end++; break; @@ -256,7 +256,7 @@ int mx_strtoul(char *str, unsigned long int *to) ul = strtoul(str, &end, 0); - if (errno) + if (errno > 0) return -errno; end = mx_strskipwhitespaces(end); @@ -284,7 +284,7 @@ int mx_strtoull(char *str, unsigned long long int *to) ull = strtoull(str, &end, 0); - if (errno) + if (errno > 0) return -errno; end = mx_strskipwhitespaces(end); @@ -314,7 +314,7 @@ int mx_strtol(char *str, signed long int *to) l = strtoul(str, &end, 0); - if (errno) + if (errno > 0) return -errno; end = mx_strskipwhitespaces(end); @@ -339,7 +339,7 @@ int mx_strtoll(char *str, signed long long int *to) ll = strtoll(str, &end, 0); - if (errno) + if (errno > 0) return -errno; end = mx_strskipwhitespaces(end); @@ -356,7 +356,7 @@ int mx_strtoll(char *str, signed long long int *to) int mx_strtoui(char *str, unsigned int *to) { - unsigned long int ul = 0; /* avoid false maybe-uninitialized warning */ + unsigned long int ul; int res; assert(str); @@ -376,7 +376,7 @@ int mx_strtoui(char *str, unsigned int *to) int mx_strtou8(char *str, uint8_t *to) { - unsigned long int ul = 0; /* avoid false maybe-uninitialized warning */ + unsigned long int ul; int res; assert(str); @@ -396,7 +396,7 @@ int mx_strtou8(char *str, uint8_t *to) int mx_strtou16(char *str, uint16_t *to) { - unsigned long int ul = 0; /* avoid false maybe-uninitialized warning */ + unsigned long int ul; int res; assert(str); @@ -416,7 +416,7 @@ int mx_strtou16(char *str, uint16_t *to) int mx_strtou32(char *str, uint32_t *to) { - unsigned long int ul = 0; /* avoid false maybe-uninitialized warning */ + unsigned long int ul; int res; assert(str); @@ -436,7 +436,7 @@ int mx_strtou32(char *str, uint32_t *to) int mx_strtou64(char *str, uint64_t *to) { - unsigned long long int ull = 0; /* avoid false maybe-uninitialized warning */; + unsigned long long int ull; int res; assert(str); @@ -458,7 +458,7 @@ int mx_strtou64(char *str, uint64_t *to) int mx_strtoi(char *str, signed int *to) { - signed long int l = 0; /* avoid false maybe-uninitialized warning */ + signed long int l; int res; assert(str); @@ -478,7 +478,7 @@ int mx_strtoi(char *str, signed int *to) int mx_strtoi8(char *str, int8_t *to) { - signed long int l = 0; /* avoid false maybe-uninitialized warning */ + signed long int l; int res; assert(str); @@ -498,7 +498,7 @@ int mx_strtoi8(char *str, int8_t *to) int mx_strtoi16(char *str, int16_t *to) { - signed long int l = 0; /* avoid false maybe-uninitialized warning */ + signed long int l; int res; assert(str); @@ -518,7 +518,7 @@ int mx_strtoi16(char *str, int16_t *to) int mx_strtoi32(char *str, int32_t *to) { - signed long int l = 0; /* avoid false maybe-uninitialized warning */ + signed long int l; int res; assert(str); @@ -538,7 +538,7 @@ int mx_strtoi32(char *str, int32_t *to) int mx_strtoi64(char *str, int64_t *to) { - signed long long int ll = 0; /* avoid false maybe-uninitialized warning */ + signed long long int ll; int res; assert(str); @@ -580,28 +580,28 @@ char *mx_strdup_forever(char *str) return dup; } -int mx_vasprintf_forever(char **strp, const char *fmt, va_list ap) +char *mx_vasprintf_forever(const char *fmt, va_list ap) { int len; + char *strp; do { - len = vasprintf(strp, fmt, ap); + len = vasprintf(&strp, fmt, ap); } while (len < 0); - return len; + return strp; } -int mx_asprintf_forever(char **strp, const char *fmt, ...) +char *mx_asprintf_forever(const char *fmt, ...) { va_list ap; - int len; va_start(ap, fmt); - len = mx_vasprintf_forever(strp, fmt, ap); + char *strp = mx_vasprintf_forever(fmt, ap); va_end(ap); - return len; + return strp; } char *mx_hostname(void) @@ -721,11 +721,10 @@ int mx_setenvf_forever(const char *name, char *fmt, ...) assert(fmt); va_list ap; - char *value = NULL; int res; va_start(ap, fmt); - mx_vasprintf_forever(&value, fmt, ap); + char *value = mx_vasprintf_forever(fmt, ap); va_end(ap); res = mx_setenv_forever(name, value); @@ -797,7 +796,7 @@ int mx_read_first_line_from_file(char *fname, char **line) int mx_strscan_ull(char **str, unsigned long long int *to) { - unsigned long long int l = 0; /* avoid false maybe-uninitialized warning */; + unsigned long long int l; char *s; char *p; char o = 0; @@ -828,7 +827,7 @@ int mx_strscan_ull(char **str, unsigned long long int *to) int mx_strscan_ll(char **str, long long int *to) { - long long int l = 0; /* avoid false maybe-uninitialized warning */; + long long int l; char *s; char *p; char o = 0; @@ -1217,9 +1216,9 @@ char *mx_cpuset_to_str(cpu_set_t* cpuset_ptr) } cpu_high=cpu-1; if (cpu_low==cpu_high) { - mx_asprintf_forever(&str,"%d",cpu_low); + str = mx_asprintf_forever("%d", cpu_low); } else { - mx_asprintf_forever(&str,"%d-%d",cpu_low,cpu_high); + str = mx_asprintf_forever("%d-%d", cpu_low, cpu_high); } res=mx_strvec_push_str(&strvec,str); if (!res) { @@ -1316,6 +1315,24 @@ time_t mx_clock_boottime(void) { return (ts.tv_sec); } +int mx_call_external(char *helper, char **argv) { + pid_t pid = fork(); + if (pid == 0) { + execv(helper, argv); + mx_log_err("%s: %m", helper); + _exit(1); + } + if (pid == -1) + return -1; + int wstatus; + waitpid(pid, &wstatus, 0); + if (wstatus != 0) { + errno = EPROTO; + return -1; + } + return 0; +} + static ssize_t readall(int fd, char *buf, size_t buflen) { ssize_t len = 0; while (buflen) { @@ -1331,7 +1348,7 @@ static ssize_t readall(int fd, char *buf, size_t buflen) { return len; } -static char *mx_call_external_v(char *helper, char **argv) { +char *mx_pipe_external(char *helper, char **argv) { int pipefd[2]; int err; char buf[2048]; @@ -1339,20 +1356,20 @@ static char *mx_call_external_v(char *helper, char **argv) { if (pipe(pipefd) < 0) { return NULL; } - int pid = fork(); - if (pid < 0) { - err = errno; - close(pipefd[0]); - close(pipefd[1]); - errno = err; - return NULL; - } + pid_t pid = fork(); if (pid == 0) { close(pipefd[0]); dup2(pipefd[1], 1); execv(helper, argv); mx_log_err("%s: %m", helper); - exit(1); + _exit(1); + } + if (pid == -1) { + err = errno; + close(pipefd[0]); + close(pipefd[1]); + errno = err; + return NULL; } close(pipefd[1]); ssize_t len = readall(pipefd[0], buf, sizeof(buf)); @@ -1385,25 +1402,3 @@ static char *mx_call_external_v(char *helper, char **argv) { errno = err; return NULL; } - -char *mx_call_external(char *cmd, ...) { - int len = 0; - va_list ap, ap2; - char **argv; - - va_start(ap, cmd); - va_copy(ap2, ap); - do - len++; - while (va_arg(ap, char *) != NULL); - va_end(ap); - len++; - argv=alloca(len*sizeof(*argv)); - int i=0; - argv[i++] = cmd; - do { - argv[i++] = va_arg(ap2, char *); - } while (i +#define close_range(first, last, flags) syscall(SYS_close_range, first, last, flags) +#endif #endif diff --git a/mxq_job.c b/mxq_job.c index 08217fcb..ba8a606a 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -426,7 +426,6 @@ int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, struct mxq_daemon *daemo static int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job) { struct mx_mysql_bind param = {0}; - char *host_id; int res; int idx; @@ -435,7 +434,7 @@ static int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mx assert(job->job_id); assert(job->daemon_id); - mx_asprintf_forever(&host_id, "%u", job->daemon_id); + char *host_id = mx_asprintf_forever("%u", job->daemon_id); char *query = "UPDATE" @@ -655,7 +654,7 @@ int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j) dir = mx_dirname_forever(j->job_stdout); - mx_asprintf_forever(&j->tmp_stdout, "%s/mxq.%u.%lu.%lu.%s.%s.%d.stdout.tmp", + j->tmp_stdout = mx_asprintf_forever("%s/mxq.%u.%lu.%lu.%s.%s.%d.stdout.tmp", dir, g->user_uid, g->group_id, j->job_id, j->host_hostname, j->daemon_name, j->host_pid); } @@ -669,7 +668,7 @@ int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j) } dir = mx_dirname_forever(j->job_stderr); - mx_asprintf_forever(&j->tmp_stderr, "%s/mxq.%u.%lu.%lu.%s.%s.%d.stderr.tmp", + j->tmp_stderr = mx_asprintf_forever("%s/mxq.%u.%lu.%lu.%s.%s.%d.stderr.tmp", dir, g->user_uid, g->group_id, j->job_id, j->host_hostname, j->daemon_name, j->host_pid); } @@ -803,7 +802,7 @@ int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **job return res; } -int mxq_unload_job_from_server(struct mx_mysql *mysql, struct mxq_daemon *daemon, uint64_t job_id) { +int mxq_unload_job_from_server(struct mx_mysql *mysql, uint64_t job_id) { /* set a job from LOADED back to INQ. This needs to reset what * mxq_assign_job_from_group_to_daemon() and mxq_set_job_status_loaded_on_server() diff --git a/mxq_job.h b/mxq_job.h index f6a9c314..15699b65 100644 --- a/mxq_job.h +++ b/mxq_job.h @@ -93,5 +93,5 @@ int mxq_set_job_status_unknown(struct mx_mysql *mysql, struct mxq_job *job); int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j); int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, struct mxq_daemon *daemon,unsigned long slots_per_job); int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **jobs_result, struct mxq_daemon *daemon); -int mxq_unload_job_from_server(struct mx_mysql *mysql, struct mxq_daemon *daemon, uint64_t job_id); +int mxq_unload_job_from_server(struct mx_mysql *mysql, uint64_t job_id); #endif diff --git a/mxq_reaper.c b/mxq_reaper.c new file mode 100644 index 00000000..f843980a --- /dev/null +++ b/mxq_reaper.c @@ -0,0 +1,104 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static const char REAPER_PNAME[] = "mxqd reaper"; + +__attribute__((noreturn)) static void die(char *msg) { + perror(msg); + _exit(1); +} + +int main(int argc, char **argv) { + + if (argc < 5 || strcmp(argv[3], "--") != 0) { + fprintf(stderr, "usage: %s UID SPOOLFILENAME -- CMD [ARGS...]\n", argv[0]); + exit(1); + } + + uid_t uid = atoi(argv[1]); + char *spoolfilename = argv[2]; + char **user_argv = &argv[4]; + + pid_t user_pid; + int user_status = -1; + struct rusage user_rusage; + struct timeval user_time; + + struct timeval starttime; + struct timeval endtime; + + if (prctl(PR_SET_NAME, REAPER_PNAME, NULL, NULL, NULL) == -1) + die("PR_SET_NAME"); + user_pid = fork(); + if (user_pid == 0) { + if (setreuid(uid, uid) == -1) + die("setreuid"); + execvp(user_argv[0], user_argv); + die(user_argv[0]); + } + if (user_pid == -1) + die("fork"); + if (gettimeofday(&starttime, NULL) == -1) + die("gettimeofday"); + while (1) { + int status; + pid_t pid = wait(&status); + if (pid < 0 && errno == ECHILD) + break; + if (pid == user_pid) + user_status = status; + } + if (gettimeofday(&endtime, NULL) == -1) + die("gettimeofday"); + timersub(&endtime, &starttime, &user_time); + if (getrusage(RUSAGE_CHILDREN, &user_rusage) == -1) + die("getrusage"); + + if (user_time.tv_sec<30) { + int wait=30-user_time.tv_sec; + sleep(wait); + } + + char *tmpfilename; + if (asprintf(&tmpfilename, "%s.tmp", spoolfilename) == -1) + die(""); + + FILE *out = fopen(tmpfilename,"w"); + if (out == NULL) + die(tmpfilename); + fprintf(out,"1 %d %d %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld\n", + getpid(), + user_status, + user_time.tv_sec, user_time.tv_usec, + user_rusage.ru_utime.tv_sec, user_rusage.ru_utime.tv_usec, + user_rusage.ru_stime.tv_sec, user_rusage.ru_stime.tv_usec, + user_rusage.ru_maxrss, + user_rusage.ru_ixrss, + user_rusage.ru_idrss, + user_rusage.ru_isrss, + user_rusage.ru_minflt, + user_rusage.ru_majflt, + user_rusage.ru_nswap, + user_rusage.ru_inblock, + user_rusage.ru_oublock, + user_rusage.ru_msgsnd, + user_rusage.ru_msgrcv, + user_rusage.ru_nsignals, + user_rusage.ru_nvcsw, + user_rusage.ru_nivcsw + ); + fflush(out); + fsync(fileno(out)); + fclose(out); + if (rename(tmpfilename, spoolfilename) == -1) + die(spoolfilename); + return 0; +} diff --git a/mxqd.c b/mxqd.c index bdb31eaa..9a50497b 100644 --- a/mxqd.c +++ b/mxqd.c @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -323,7 +324,8 @@ static int cpuset_init(struct mxq_server *server) } static void read_hostconfig_retry(struct keywordset *kws) { - char *line = mx_call_external("/usr/sbin/hostconfig", NULL); + char *argv[] = { "/usr/sbin/hostconfig", NULL }; + char *line = mx_pipe_external("/usr/sbin/hostconfig", argv); if (!line) { mx_log_err("hostconfig: %m"); exit(1); @@ -335,7 +337,8 @@ static void read_hostconfig_retry(struct keywordset *kws) { static char gpu_setup_script[] = LIBEXECDIR "/mxq/gpu-setup"; static int get_gpus(void) { - char *line = mx_call_external(gpu_setup_script, "init", NULL); + char *argv[] = { gpu_setup_script, "init", NULL }; + char *line = mx_pipe_external(gpu_setup_script, argv); if (!line) { mx_log_err("gpu-setup init: %m"); exit(1); @@ -666,7 +669,7 @@ static int server_init(struct mxq_server *server, int argc, char *argv[]) return -EX_UNAVAILABLE; } - mx_asprintf_forever(&server->finished_jobsdir,"%s/%s",MXQ_FINISHED_JOBSDIR,server->daemon_name); + server->finished_jobsdir = mx_asprintf_forever("%s/%s", MXQ_FINISHED_JOBSDIR, server->daemon_name); res=mx_mkdir_p(server->finished_jobsdir,0700); if (res<0) { mx_log_err("MAIN: mkdir %s failed: %m. Exiting.",MXQ_FINISHED_JOBSDIR); @@ -746,7 +749,7 @@ static int server_init(struct mxq_server *server, int argc, char *argv[]) server->starttime = pps->starttime; mx_proc_pid_stat_free_content(pps); - mx_asprintf_forever(&server->host_id, "%s-%llx-%x", server->boot_id, server->starttime, getpid()); + server->host_id = mx_asprintf_forever("%s-%llx-%x", server->boot_id, server->starttime, getpid()); mx_setenv_forever("MXQ_HOSTID", server->host_id); server->slots = arg_threads_total; @@ -809,31 +812,131 @@ static int server_init(struct mxq_server *server, int argc, char *argv[]) return 0; } -static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job) +static int mxq_redirect_open(char *fname) { - struct mxq_server *server; - struct mxq_group *group; - struct passwd *passwd; + int fh; int res; + + int flags = O_WRONLY|O_CREAT|O_NOFOLLOW|O_TRUNC; + mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH; + + + if (!fname) { + fname = "/dev/null"; + } else if (!mx_streq(fname, "/dev/null")) { + res = unlink(fname); + if (res == -1 && errno != ENOENT) { + mx_log_err("%s: unlink() failed: %m", fname); + return -2; + } + flags |= O_EXCL; + } + + fh = open(fname, flags, mode); + if (fh == -1) { + mx_log_err("open() failed: %m"); + } + + return fh; + +} + +static int mxq_redirect(char *fname, int fd) +{ int fh; - struct rlimit rlim; + int res; - assert(job); - assert(glist); - assert(glist->user); - assert(glist->user->server); + fh = mxq_redirect_open(fname); + if (fh < 0) + return -1; - server = glist->user->server; - group = &glist->group; + res = mx_dup2_close_both(fh, fd); + if (res < 0) + return -2; + + return 0; +} + +static int mxq_redirect_output(char *stdout_fname, char *stderr_fname) +{ + int res; + + res = mxq_redirect(stderr_fname, STDERR_FILENO); + if (res < 0) { + return -1; + } + + if (stdout_fname == stderr_fname) { + res = mx_dup2_close_new(STDERR_FILENO, STDOUT_FILENO); + if( res < 0) { + return -2; + } + return 0; + } + + res = mxq_redirect(stdout_fname, STDOUT_FILENO); + if (res < 0) { + return -3; + } + + return 0; +} + +static int mxq_redirect_input(char *stdin_fname) +{ + int fh; + int res; + + fh = open(stdin_fname, O_RDONLY|O_NOFOLLOW); + if (fh == -1) { + mx_log_err("open() failed: %m"); + return -1; + } + + res = mx_dup2_close_both(fh, STDIN_FILENO); + if (res < 0) { + return -2; + } + + return 1; +} + +static const char REAPER_PNAME[] = "mxqd reaper"; + +static int is_reaper(pid_t pid) { + char comm[16]; + if (mx_proc_get_comm(pid, comm) == NULL) + return 0; + if (strcmp(comm, REAPER_PNAME) == 0) + return 1; + else + return 0; +} + +static void exec_reaper(struct mxq_server *server,struct mxq_group_list *glist, struct mxq_job *job) { + struct mxq_group *group = &glist->group; + + if (prctl(PR_SET_NAME, REAPER_PNAME, NULL, NULL, NULL) ==-1) { + mx_log_err("reaper_process set name: %m"); + return; + } + if (setsid() == -1) { + mx_log_err("reaper_process setsid: %m"); + return; + } + if (prctl(PR_SET_CHILD_SUBREAPER, 1) == -1) { + mx_log_err("set subreaper: %m"); + return; + } sigprocmask(SIG_UNBLOCK,&all_signals,NULL); signal(SIGPIPE,SIG_DFL); - passwd = getpwuid(group->user_uid); + struct passwd *passwd = getpwuid(group->user_uid); if (!passwd) { mx_log_err("job=%s(%d):%lu:%lu getpwuid(): %m", group->user_name, group->user_uid, group->group_id, job->job_id); - return 0; + return; } if (!mx_streq(passwd->pw_name, group->user_name)) { @@ -850,7 +953,7 @@ static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job) if (!passwd) { mx_log_err("job=%s(%d):%lu:%lu getpwnam(): %m", group->user_name, group->user_uid, group->group_id, job->job_id); - return 0; + return; } if (passwd->pw_uid != group->user_uid) { mx_log_fatal("job=%s(%d):%lu:%lu user_name=%s does not map to uid=%d but to pw_uid=%d. Aborting Child execution.", @@ -862,17 +965,14 @@ static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job) group->user_uid, passwd->pw_uid); - return 0; + return; } } - /* prepare environment */ - - res = clearenv(); - if (res != 0) { + if (clearenv() != 0) { mx_log_err("job=%s(%d):%lu:%lu clearenv(): %m", group->user_name, group->user_uid, group->group_id, job->job_id); - return 0; + return; } mx_setenv_forever("USER", group->user_name); @@ -895,52 +995,47 @@ static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job) if (group->job_tmpdir_size == 0) { mx_setenv_forever("TMPDIR", server->initial_tmpdir); } else { - char *mxq_job_tmpdir; - mx_asprintf_forever(&mxq_job_tmpdir, "%s/%lu", MXQ_JOB_TMPDIR_MNTDIR, job->job_id); + char *mxq_job_tmpdir = mx_asprintf_forever("%s/%lu", MXQ_JOB_TMPDIR_MNTDIR, job->job_id); mx_setenv_forever("MXQ_JOB_TMPDIR", mxq_job_tmpdir); mx_setenv_forever("TMPDIR", mxq_job_tmpdir); - free(mxq_job_tmpdir); + // not needed before exec() or exit(): free(mxq_job_tmpdir); } if (group->job_gpu) { - char *pid; - char *uid; - mx_asprintf_forever(&pid, "%d", job->host_pid); - mx_asprintf_forever(&uid, "%u", group->user_uid); - char *gpu_uuid = mx_call_external(gpu_setup_script, "job-init", pid, uid, NULL); + char *argv[] = { + gpu_setup_script, + "job-init", + mx_asprintf_forever("%d", job->host_pid), + mx_asprintf_forever("%u", group->user_uid), + NULL + }; + char *gpu_uuid = mx_pipe_external(gpu_setup_script, argv); if (!gpu_uuid) { mx_log_err("gpu-setup job-init: %m"); exit(1); } mx_setenv_forever("CUDA_VISIBLE_DEVICES", gpu_uuid); - free(gpu_uuid); - free(pid); - free(uid); + // not needed before exec() or exit(): free(gpu_uuid); free(argv[2]); free(argv[3]); } - fh = open("/proc/self/loginuid", O_WRONLY|O_TRUNC); + int fh = open("/proc/self/loginuid", O_WRONLY|O_TRUNC); if (fh == -1) { mx_log_err("job=%s(%d):%lu:%lu open(%s) failed: %m", group->user_name, group->user_uid, group->group_id, job->job_id, "/proc/self/loginuid"); - return 0; + return; } dprintf(fh, "%d", group->user_uid); close(fh); - /* set memory limits */ + struct rlimit rlim; rlim.rlim_cur = group->job_memory*1024*1024; rlim.rlim_max = group->job_memory*1024*1024; - - res = setrlimit(RLIMIT_DATA, &rlim); - if (res == -1) + if (setrlimit(RLIMIT_DATA, &rlim) == -1) mx_log_err("job=%s(%d):%lu:%lu setrlimit(RLIMIT_DATA, ...) failed: %m", group->user_name, group->user_uid, group->group_id, job->job_id); - /* disable core files */ rlim.rlim_cur = 0; rlim.rlim_cur = 0; - - res = setrlimit(RLIMIT_CORE, &rlim); - if (res == -1) + if (setrlimit(RLIMIT_CORE, &rlim) == -1) mx_log_err("job=%s(%d):%lu:%lu setrlimit(RLIMIT_CORE, ...) failed: %m", group->user_name, group->user_uid, group->group_id, job->job_id); @@ -950,180 +1045,62 @@ static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job) rlim.rlim_cur = group->job_time*60; rlim.rlim_cur = group->job_time*63; - res = setrlimit(RLIMIT_CPU, &rlim); - if (res == -1) + if (setrlimit(RLIMIT_CPU, &rlim) == -1) mx_log_err("job=%s(%d):%lu:%lu setrlimit(RLIMIT_CPU, ...) failed: %m", group->user_name, group->user_uid, group->group_id, job->job_id); } - res = initgroups(passwd->pw_name, group->user_gid); - if (res == -1) { + if (initgroups(passwd->pw_name, group->user_gid) == -1) { mx_log_err("job=%s(%d):%lu:%lu initgroups() failed: %m", group->user_name, group->user_uid, group->group_id, job->job_id); - return 0; + return; } - - res = setregid(group->user_gid, group->user_gid); - if (res == -1) { + if (setregid(group->user_gid, group->user_gid) == -1) { mx_log_err("job=%s(%d):%lu:%lu setregid(%d, %d) failed: %m", group->user_name, group->user_uid, group->group_id, job->job_id, group->user_gid, group->user_gid); - return 0; + return; } - - res = setreuid(group->user_uid, group->user_uid); - if (res == -1) { + if (setreuid(-1, group->user_uid) == -1) { mx_log_err("job=%s(%d):%lu:%lu setreuid(%d, %d) failed: %m", group->user_name, group->user_uid, group->group_id, job->job_id, group->user_uid, group->user_uid); - return 0; + return; } - res = chdir(job->job_workdir); - if (res == -1) { + if (chdir(job->job_workdir) == -1) { mx_log_err("job=%s(%d):%lu:%lu chdir(%s) failed: %m", group->user_name, group->user_uid, group->group_id, job->job_id, job->job_workdir); - return 0; + return; } umask(job->job_umask); - res=sched_setaffinity(0,sizeof(job->host_cpu_set),&job->host_cpu_set); - if (res<0) mx_log_warning("sched_setaffinity: $m"); - - return 1; -} - -/**********************************************************************/ - -static int mxq_redirect_open(char *fname) -{ - int fh; - int res; - - int flags = O_WRONLY|O_CREAT|O_NOFOLLOW|O_TRUNC; - mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH; - - - if (!fname) { - fname = "/dev/null"; - } else if (!mx_streq(fname, "/dev/null")) { - res = unlink(fname); - if (res == -1 && errno != ENOENT) { - mx_log_err("%s: unlink() failed: %m", fname); - return -2; - } - flags |= O_EXCL; - } - - fh = open(fname, flags, mode); - if (fh == -1) { - mx_log_err("open() failed: %m"); - } - - return fh; - -} - -static int mxq_redirect(char *fname, int fd) -{ - int fh; - int res; - - fh = mxq_redirect_open(fname); - if (fh < 0) - return -1; - - res = mx_dup2_close_both(fh, fd); - if (res < 0) - return -2; - - return 0; -} - -static int mxq_redirect_output(char *stdout_fname, char *stderr_fname) -{ - int res; - - res = mxq_redirect(stderr_fname, STDERR_FILENO); - if (res < 0) { - return -1; - } - - if (stdout_fname == stderr_fname) { - res = mx_dup2_close_new(STDERR_FILENO, STDOUT_FILENO); - if( res < 0) { - return -2; - } - return 0; - } - - res = mxq_redirect(stdout_fname, STDOUT_FILENO); - if (res < 0) { - return -3; - } - - return 0; -} - -static int mxq_redirect_input(char *stdin_fname) -{ - int fh; - int res; - - fh = open(stdin_fname, O_RDONLY|O_NOFOLLOW); - if (fh == -1) { - mx_log_err("open() failed: %m"); - return -1; - } - - res = mx_dup2_close_both(fh, STDIN_FILENO); - if (res < 0) { - return -2; - } - - return 1; -} - -static int user_process(struct mxq_group_list *glist, struct mxq_job *job) -{ - int res; - char **argv; - - struct mxq_group *group; - - group = &glist->group; - - res = init_child_process(glist, job); - if (!res) - return(-1); + if (sched_setaffinity(0,sizeof(job->host_cpu_set),&job->host_cpu_set) == -1) + mx_log_warning("sched_setaffinity: $m"); mxq_job_set_tmpfilenames(group, job); - res = mxq_redirect_input("/dev/null"); - if (res < 0) { - mx_log_err(" job=%s(%d):%lu:%lu mxq_redirect_input() failed (%d): %m", + if (mxq_redirect_input("/dev/null") < 0) { + mx_log_err(" job=%s(%d):%lu:%lu mxq_redirect_input() failed: %m", group->user_name, group->user_uid, group->group_id, - job->job_id, - res); - return(res); + job->job_id); + return; } - res = mxq_redirect_output(job->tmp_stdout, job->tmp_stderr); - if (res < 0) { - mx_log_err(" job=%s(%d):%lu:%lu mxq_redirect_output() failed (%d): %m", + if (mxq_redirect_output(job->tmp_stdout, job->tmp_stderr) < 0) { + mx_log_err(" job=%s(%d):%lu:%lu mxq_redirect_output() failed: %m", group->user_name, group->user_uid, group->group_id, - job->job_id, - res); - return(res); + job->job_id); + return; } - argv = mx_strvec_from_str(job->job_argv_str); + char **argv = mx_strvec_from_str(job->job_argv_str); if (!argv) { mx_log_err("job=%s(%d):%lu:%lu Can't recaculate commandline. str_to_strvev(%s) failed: %m", group->user_name, @@ -1131,146 +1108,42 @@ static int user_process(struct mxq_group_list *glist, struct mxq_job *job) group->group_id, job->job_id, job->job_argv_str); - return -errno; + return; } - res = execvp(argv[0], argv); + int argc = 0; + while (argv[argc] != NULL) + argc++; + + char **new_argv = mx_calloc_forever(argc+4+1, sizeof(char *)); + new_argv[0] = LIBEXECDIR "/mxq/mxq_reaper"; + new_argv[1] = mx_asprintf_forever("%d", group->user_uid); + new_argv[2] = mx_asprintf_forever("%s/%lu.stat", server->finished_jobsdir, job->job_id); + new_argv[3] = "--"; + for (int i = 0; i < argc ; i++) + new_argv[i+4] = argv[i]; + new_argv[argc+4] = NULL; + // not needed before exec() or exit: free(argv); free(argv[1]); free(argv[2]); + + if (setuid(0) == -1) { + mx_log_err("job=%s(%d):%lu:%lu setuid(0) failed: %m", + group->user_name, + group->user_uid, + group->group_id, + job->job_id); + return; + } + + execvp(new_argv[0], new_argv); mx_log_err("job=%s(%d):%lu:%lu execvp(\"%s\", ...): %m", group->user_name, group->user_uid, group->group_id, job->job_id, argv[0]); - return res; } -static const char REAPER_PNAME[] = "mxqd reaper"; - -static int is_reaper(pid_t pid) { - char comm[16]; - if (mx_proc_get_comm(pid, comm) == NULL) - return 0; - if (strcmp(comm, REAPER_PNAME) == 0) - return 1; - else - return 0; -} - -static int reaper_process(struct mxq_server *server,struct mxq_group_list *glist, struct mxq_job *job) { - pid_t pid; - struct rusage rusage; - int status = 0; - pid_t waited_pid; - int waited_status; - struct timeval now; - struct timeval realtime; - _mx_cleanup_free_ char *finished_job_filename=NULL; - _mx_cleanup_free_ char *finished_job_tmpfilename=NULL; - FILE *out; - int res; - - struct mxq_group *group; - - group = &glist->group; - - res = prctl(PR_SET_NAME, REAPER_PNAME, NULL, NULL, NULL); - if (res < 0) { - mx_log_err("reaper_process set name: %m"); - return res; - } - - res = setsid(); - if (res < 0) { - mx_log_err("reaper_process setsid: %m"); - return res; - } - - res = prctl(PR_SET_CHILD_SUBREAPER, 1); - if (res < 0) { - mx_log_err("set subreaper: %m"); - return res; - } - - pid = fork(); - if (pid < 0) { - mx_log_err("fork: %m"); - return pid; - } else if (pid == 0) { - mx_log_debug("starting user process."); - res = user_process(glist, job); - _exit(EX__MAX+1); - } - gettimeofday(&job->stats_starttime, NULL); - - while (1) { - waited_pid = wait(&waited_status); - if (waited_pid < 0) { - if (errno==ECHILD) { - break; - } else { - mx_log_warning("reaper: wait: %m"); - sleep(1); - } - } - if (waited_pid == pid) { - status = waited_status; - } - } - gettimeofday(&now, NULL); - timersub(&now, &job->stats_starttime, &realtime); - - if (realtime.tv_sec<30) { - int wait=30-realtime.tv_sec; - mx_log_warning("user process finished to fast (%ld seconds) : delaying termination for %d seconds",realtime.tv_sec,wait); - sleep(wait); - } - - res = getrusage(RUSAGE_CHILDREN, &rusage); - if (res < 0) { - mx_log_err("reaper: getrusage: %m"); - return(res); - } - - mx_asprintf_forever(&finished_job_filename, "%s/%lu.stat", server->finished_jobsdir, job->job_id); - mx_asprintf_forever(&finished_job_tmpfilename, "%s.tmp", finished_job_filename); - - out=fopen(finished_job_tmpfilename,"w"); - if (!out) { - mx_log_fatal("%s: %m",finished_job_tmpfilename); - return (-errno); - } - - fprintf(out,"1 %d %d %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld\n", - getpid(), - status, - realtime.tv_sec,realtime.tv_usec, - rusage.ru_utime.tv_sec,rusage.ru_utime.tv_usec, - rusage.ru_stime.tv_sec,rusage.ru_stime.tv_usec, - rusage.ru_maxrss, - rusage.ru_ixrss, - rusage.ru_idrss, - rusage.ru_isrss, - rusage.ru_minflt, - rusage.ru_majflt, - rusage.ru_nswap, - rusage.ru_inblock, - rusage.ru_oublock, - rusage.ru_msgsnd, - rusage.ru_msgrcv, - rusage.ru_nsignals, - rusage.ru_nvcsw, - rusage.ru_nivcsw - ); - fflush(out); - fsync(fileno(out)); - fclose(out); - res=rename(finished_job_tmpfilename,finished_job_filename); - if (res<0) { - mx_log_fatal("rename %s: %m",finished_job_tmpfilename); - return(res); - } - return(0); -} +static char tmpdir_script[] = LIBEXECDIR "/mxq/tmpdir-setup"; static unsigned long start_job(struct mxq_group_list *glist) { @@ -1284,11 +1157,8 @@ static unsigned long start_job(struct mxq_group_list *glist) struct mxq_daemon *daemon; - static char create_job_tmpdir_script[] = LIBEXECDIR "/mxq/create_job_tmpdir"; - pid_t pid; int res; - int status; assert(glist); assert(glist->user); @@ -1305,30 +1175,22 @@ static unsigned long start_job(struct mxq_group_list *glist) } if (group->job_tmpdir_size > 0) { - mx_mysql_disconnect(server->mysql); - pid = fork(); - if (pid==0) { - char *argv[2]; - char *envp[4]; - argv[0] = create_job_tmpdir_script, - argv[1] = NULL; - mx_asprintf_forever(&envp[0], "MXQ_JOBID=%lu", job->job_id); - mx_asprintf_forever(&envp[1], "MXQ_SIZE=%u", group->job_tmpdir_size); - mx_asprintf_forever(&envp[2], "MXQ_UID=%d", group->user_uid); - envp[3] = NULL; - execve(create_job_tmpdir_script,argv,envp); - mx_log_fatal("exec %s : %m",create_job_tmpdir_script); - exit(1); - } - mx_mysql_connect_forever(&(server->mysql)); - if (pid < 0) { - mx_log_err("fork: %m"); - mxq_unload_job_from_server(server->mysql, daemon, job->job_id); - return(0); - } - waitpid(pid, &status, 0); - if (status) { - mxq_unload_job_from_server(server->mysql, daemon, job->job_id); + char *argv[] = { + tmpdir_script, + "create", + mx_asprintf_forever("%lu", job->job_id), + mx_asprintf_forever("%u", group->job_tmpdir_size), + mx_asprintf_forever("%d", group->user_uid), + NULL + }; + int status = mx_call_external(tmpdir_script, argv); + free(argv[2]); + free(argv[3]); + free(argv[4]); + if (status == -1) { + mx_log_err("create job tmpdir: %m"); + mxq_unload_job_from_server(server->mysql, job->job_id); + sleep(30); return 0; } } @@ -1337,36 +1199,27 @@ static unsigned long start_job(struct mxq_group_list *glist) mx_free_null(job->host_cpu_set_str); job->host_cpu_set_str = mx_cpuset_to_str(&job->host_cpu_set); - mx_mysql_disconnect(server->mysql); - pid = fork(); + if (pid == 0) { + job->host_pid = getpid(); + mx_log_debug("starting reaper process."); + // we would like to use CLOSE_RANGE_CLOEXEC, but would need Linux 5.11 for that + if (close_range(3, ~0u, 0) == -1) { + mx_log_fatal("close_range: %m"); + _exit(1); + } + exec_reaper(server, glist, job); + _exit(EX__MAX+1); + } if (pid < 0) { mx_log_err("fork: %m"); cpuset_clear_running(&job->host_cpu_set,&server->cpu_set_available); - mxq_unload_job_from_server(server->mysql, daemon, job->job_id); + mxq_unload_job_from_server(server->mysql, job->job_id); return 0; - } else if (pid == 0) { - job->host_pid = getpid(); - - mx_log_debug("starting reaper process."); - mx_funlock_nodelete(server->flock); - server->flock = NULL; - mx_mysql_finish(&server->mysql); - - res = reaper_process(server, glist, job); - - mxq_job_free_content(job); - - mx_log_debug("shutting down reaper, bye bye."); - mx_log_finish(); - server_free(server); - _exit(res<0 ? EX__MAX+1 : 0); } gettimeofday(&job->stats_starttime, NULL); - mx_mysql_connect_forever(&(server->mysql)); - job->host_pid = pid; job->host_slots = glist->slots_per_job; res = mxq_set_job_status_running(server->mysql, job); @@ -1401,7 +1254,7 @@ static unsigned long start_job(struct mxq_group_list *glist) return 1; } -static int can_start_job(struct mxq_group_list *group, unsigned long df_scratch, struct mxq_server *server, long slots_to_start) { +static int can_start_job(struct mxq_group_list *group, unsigned long df_scratch, struct mxq_server *server, unsigned long slots_to_start) { /* Can we start a(nother) job from this group */ if (group->jobs_running >= group->group.group_jobs) return 0; @@ -1426,7 +1279,7 @@ static int can_start_job_for_user(struct mxq_user_list *user, unsigned long df_s return 0; } -static unsigned long start_user(struct mxq_user_list *ulist, long slots_to_start, unsigned long df_scratch) +static unsigned long start_user(struct mxq_user_list *ulist, unsigned long slots_to_start, unsigned long df_scratch) { struct mxq_server *server; struct mxq_group_list *glist; @@ -1442,7 +1295,7 @@ static unsigned long start_user(struct mxq_user_list *ulist, long slots_to_start assert(slots_to_start <= server->slots - server->slots_running); - mx_log_debug(" user=%s(%d) slots_to_start=%ld :: trying to start jobs for user.", + mx_log_debug(" user=%s(%d) slots_to_start=%lu :: trying to start jobs for user.", group->user_name, group->user_uid, slots_to_start); for (glist = ulist->groups; glist ; glist = glist->next) { @@ -1450,7 +1303,7 @@ static unsigned long start_user(struct mxq_user_list *ulist, long slots_to_start group = &glist->group; if (can_start_job(glist, df_scratch, server, slots_to_start)) { - mx_log_info(" group=%s(%d):%lu slots_to_start=%ld slots_per_job=%lu :: trying to start job for group.", + mx_log_info(" group=%s(%d):%lu slots_to_start=%lu slots_per_job=%lu :: trying to start job for group.", group->user_name, group->user_uid, group->group_id, slots_to_start, glist->slots_per_job); if (start_job(glist)) { int slots_started = glist->slots_per_job; @@ -1797,8 +1650,6 @@ static int killall(struct mxq_server *server) struct mxq_group_list *glist; struct mxq_job_list *jlist; - struct mxq_group *group; - struct ppidcache *ppidcache = ppidcache_new(); ppidcache_scan(ppidcache); @@ -1806,7 +1657,6 @@ static int killall(struct mxq_server *server) for (ulist = server->users; ulist; ulist = ulist->next) { for (glist = ulist->groups; glist; glist = glist->next) { - group = &glist->group; for (jlist = glist->jobs; jlist; jlist = jlist->next) killstate_event(ppidcache, jlist, KILLEVENT_CANCEL); } @@ -1870,7 +1720,6 @@ static int killall_over_memory(struct ppidcache *ppidcache, struct mxq_server *s struct mx_proc_tree *ptree = NULL; struct mx_proc_info *pinfo; - long pagesize; int res; assert(server); @@ -1881,12 +1730,6 @@ static int killall_over_memory(struct ppidcache *ppidcache, struct mxq_server *s /* limit killing to every >= 10 seconds */ mx_within_rate_limit_or_return(10, 0); - pagesize = sysconf(_SC_PAGESIZE); - if (!pagesize) { - mx_log_warning("killall_over_memory(): Can't get _SC_PAGESIZE. Assuming 4096."); - pagesize = 4096; - } - res = mx_proc_tree(&ptree); if (res < 0) { mx_log_err("killall_over_memory(): Reading process tree failed: %m"); @@ -1930,9 +1773,7 @@ static int killall_cancelled(struct ppidcache *ppidcache, struct mxq_server *ser struct mxq_user_list *ulist; struct mxq_group_list *glist; struct mxq_job_list *jlist; - struct mxq_group *group; - struct mxq_job *job; assert(server); @@ -1948,7 +1789,6 @@ static int killall_cancelled(struct ppidcache *ppidcache, struct mxq_server *ser group->user_name, group->user_uid, group->group_id); for (jlist = glist->jobs; jlist; jlist = jlist->next) { - job = &jlist->job; killstate_event(ppidcache, jlist, KILLEVENT_CANCEL); } } @@ -2021,39 +1861,29 @@ static void rename_outfiles(struct mxq_server *server, struct mxq_group *group, } } -static char *job_tmpdir_path(unsigned long job_id) { - char *pathname; - mx_asprintf_forever(&pathname, "%s/%lu", MXQ_JOB_TMPDIR_MNTDIR, job_id); - return pathname; -} - -static int unmount_and_remove(char *pathname) { - int res; - res = rmdir(pathname); - if (res && errno==EBUSY) { - res = umount(pathname); - if (res == 0) { - res = rmdir(pathname); - } - } - return res; -} - static void unmount_job_tmpdir(unsigned long job_id) { - char *pathname; - pathname=job_tmpdir_path(job_id); - if (unmount_and_remove(pathname)) { - mx_log_warning("failed to unmount/remove stale job tmpdir %s: %m", pathname); - } - free(pathname); + char *argv[] = { + tmpdir_script, + "cleanup", + mx_asprintf_forever("%lu", job_id), + NULL + }; + int res = mx_call_external(tmpdir_script, argv); + free(argv[2]); + if (res == -1) + mx_log_err("cleanup job tmpdir: %m"); } static void release_gpu(struct mxq_server *server, struct mxq_group *group, struct mxq_job *job) { if (group->job_gpu) { - char *pid; - mx_asprintf_forever(&pid, "%d", job->host_pid); - char *gpu_uuid = mx_call_external(gpu_setup_script, "job-release", pid, NULL); - free(pid); + char *argv[] = { + gpu_setup_script, + "job-release", + mx_asprintf_forever("%d", job->host_pid), + NULL + }; + char *gpu_uuid = mx_pipe_external(gpu_setup_script, argv); + free(argv[2]); if (!gpu_uuid) { mx_log_err("gpu-setup job-release: %m"); exit(1); @@ -2109,13 +1939,6 @@ static int job_is_lost(struct mxq_server *server,struct mxq_group *group, struct return cnt; } -static char *fspool_get_filename (struct mxq_server *server,long unsigned int job_id) -{ - char *fspool_filename; - mx_asprintf_forever(&fspool_filename,"%s/%lu.stat",server->finished_jobsdir,job_id); - return fspool_filename; -} - static int fspool_process_file(struct mxq_server *server,char *filename, uint64_t job_id) { FILE *in; int res; @@ -2244,7 +2067,7 @@ static int fspool_scan(struct mxq_server *server) { } for (i=0;ifinished_jobsdir,namelist[i]->d_name); + filename = mx_asprintf_forever("%s/%s", server->finished_jobsdir, namelist[i]->d_name); if (fspool_is_valid_name_parse(namelist[i]->d_name,&job_id)) { res=fspool_process_file(server,filename,job_id); if (res>0) { @@ -2277,8 +2100,7 @@ static int file_exists(char *name) { } static int fspool_file_exists(struct mxq_server *server,uint64_t job_id) { - _mx_cleanup_free_ char *fspool_filename=NULL; - fspool_filename=fspool_get_filename(server,job_id); + _mx_cleanup_free_ char *fspool_filename = mx_asprintf_forever("%s/%lu.stat", server->finished_jobsdir, job_id); return file_exists(fspool_filename); } @@ -2609,8 +2431,9 @@ static int recover_from_previous_crash(struct mxq_server *server) mx_log_err("recover: server_fspool_scan: %m"); return res; } - if (res>0) - mx_log_info("recover: processed %d finished jobs from fspool",res); + /* Do not log slots returned, because this value is missleading with --recover-only as the current + * server may have much smaller slots then the previous server started with memory from + * mxqdctl-hostconfig */ res=lost_scan(server); if (res<0) { @@ -2882,7 +2705,7 @@ int main(int argc, char *argv[]) mx_log_info("-------------------------------------------------------------"); mx_log_info(" Reexecuting %s", argv[0]); mx_log_info("-------------------------------------------------------------"); - res = execvp(argv[0], argv); + execvp(argv[0], argv); mx_log_fatal("execvp(\"%s\", ...): %m", argv[0]); } diff --git a/mxqd_control.c b/mxqd_control.c index 47903673..d51ffebc 100644 --- a/mxqd_control.c +++ b/mxqd_control.c @@ -206,7 +206,7 @@ struct mxq_job_list *server_get_job_list_by_pid(struct mxq_server *server, pid_t for (glist = ulist->groups; glist; glist = glist->next) { for (jlist = glist->jobs; jlist; jlist = jlist->next) { job = &jlist->job; - if (job->host_pid == pid) + if (job->host_pid == (unsigned)pid) return jlist; } } diff --git a/mxqset.c b/mxqset.c index 18cf375a..126471f6 100644 --- a/mxqset.c +++ b/mxqset.c @@ -186,16 +186,16 @@ static error_t parser (int key, char *arg, struct argp_state *state) { } static const struct argp_option options[] = { - {"closed", 10, NULL, 0, NULL}, - {"open", 11, NULL, 0, NULL}, - {"blacklist", 13, "", 0, NULL}, - {"whitelist", 15, "", 0, NULL}, + { .name = "closed", .key = 10 }, + { .name = "open", .key = 11 }, + { .name = "blacklist", .key = 13 }, + { .name = "whitelist", .key = 15 }, {0} }; -static const struct argp argp = { options, parser, NULL, NULL }; +static const struct argp argp = { .options = options, .parser = parser }; -static __attribute__ ((noreturn)) void exit_usage(char *argv0) { +static __attribute__ ((noreturn)) void exit_usage(void) { fprintf(stderr, "usage: %s group GID [group-options]\n" "\n" @@ -210,14 +210,12 @@ static __attribute__ ((noreturn)) void exit_usage(char *argv0) { } int main(int argc, char **argv) { - - char *argv0=argv[0]; struct mx_mysql *mysql = NULL; int groupid; uid_t uid = getuid(); if (argc<3 || strcmp(argv[1],"group") != 0) - exit_usage(argv0); + exit_usage(); groupid=atoi(argv[2]); int sts; @@ -225,7 +223,7 @@ int main(int argc, char **argv) { sts=argp_parse (&argp, argc-3, &argv[3], ARGP_PARSE_ARGV0|ARGP_SILENT, NULL, &opts); if (sts) - exit_usage(argv0); + exit_usage(); assert(mx_mysql_initialize(&mysql) == 0); mx_mysql_option_set_default_file(mysql, MXQ_MYSQL_DEFAULT_FILE); diff --git a/mxqsub.c b/mxqsub.c index af901d67..7ed7e69a 100644 --- a/mxqsub.c +++ b/mxqsub.c @@ -67,7 +67,7 @@ static void print_usage(void) "\n" " -j, --processors=NUMBER set number of processors (default: 1)\n" " -m, --memory=SIZE set amount of memory (default: 2G)\n" - " --tmpdir=SIZE set size of MXQ_JOB_TMPDIR (default: 0)\n" + " --tmpdir=SIZE set size of MXQ_JOB_TMPDIR (default: 100G)\n" " --gpu request a gpu\n" " --blacklist=STRING set list of blacklisted servers (default: '')\n" " --whitelist=STRING set list of whitelisted servers (default: '')\n" @@ -560,7 +560,7 @@ static int add_job(struct mx_mysql *mysql, struct mxq_job *j) j->job_id = insert_id; - res = mx_mysql_statement_close(&stmt); + mx_mysql_statement_close(&stmt); return (int)num_rows; } @@ -602,7 +602,7 @@ static int get_active_groups_for_user(struct mx_mysql *mysql, char *username) return count; } -static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, int flags, uint64_t group_id) +static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, uint64_t group_id) { int res; struct mxq_group *g; @@ -724,8 +724,6 @@ int main(int argc, char *argv[]) _mx_cleanup_free_ char *whitelist = NULL; _mx_cleanup_free_ char *tags = NULL; - int flags = 0; - struct mxq_job job; struct mxq_group group; @@ -805,7 +803,7 @@ int main(int argc, char *argv[]) arg_debug = 0; arg_jobflags = 0; arg_groupid = UINT64_UNSET; - arg_tmpdir = 0; + arg_tmpdir = 100; // 100G arg_blacklist = NULL; arg_whitelist = NULL; arg_prerequisites = ""; @@ -902,6 +900,7 @@ int main(int argc, char *argv[]) case 2: mx_log_warning("option --group_priority is deprecated. please use --group-priority instead."); + // fall through case 'P': if (mx_strtou16(optctl.optarg, &arg_group_priority) < 0) { mx_log_crit("--group-priority '%s': %m", optctl.optarg); @@ -931,6 +930,7 @@ int main(int argc, char *argv[]) case 4: mx_log_warning("option '--time' is deprecated. please use '--runtime' or '-t' in future calls."); + // fall through case 't': if (mx_strtou32(optctl.optarg, &arg_time) < 0) { unsigned long long int minutes; @@ -1185,7 +1185,7 @@ int main(int argc, char *argv[]) mx_log_info("MySQL: Connection to database established."); - res = mxq_submit_task(mysql, &job, flags, arg_groupid); + res = mxq_submit_task(mysql, &job, arg_groupid); mx_mysql_finish(&mysql); diff --git a/parser.y b/parser.y index 835486cc..503d0df6 100644 --- a/parser.y +++ b/parser.y @@ -53,6 +53,7 @@ bool: '(' bool ')' { $$ = $2; }; #include "xmalloc.h" int yylex (YYSTYPE *lvalp, YYLTYPE *llocp, struct parser_context *ctx) { + (void)llocp; int c = ctx->input[ctx->pos]; while (c == ' ' || c == '\t') @@ -88,6 +89,9 @@ int yylex (YYSTYPE *lvalp, YYLTYPE *llocp, struct parser_context *ctx) { #include void yyerror (YYLTYPE *locp, struct parser_context *ctx, char const *s) { + (void)locp; + (void)ctx; + (void)s; } diff --git a/test_mx_log.c b/test_mx_log.c index 5d1fb05f..8233694d 100644 --- a/test_mx_log.c +++ b/test_mx_log.c @@ -93,7 +93,7 @@ static void test_mx_log_level_syslog_to_mxlog(void) assert(mx_log_level_syslog_to_mxlog(LOG_DEBUG) == MX_LOG_DEBUG); } -int main(int argc, char *argv[]) +int main(void) { test_mx_log_level_syslog_to_mxlog(); test_mx_log_level_mxlog_to_syslog(); diff --git a/test_mx_util.c b/test_mx_util.c index 20b58c81..236918fd 100644 --- a/test_mx_util.c +++ b/test_mx_util.c @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include "mx_util.h" #include "mx_proc.h" @@ -522,41 +524,91 @@ static void test_mx_df(void) { assert(mx_df("/") > 0); } -static void test_mx_call_external(void) { +static void test_mx_pipe_external(void) { char *line; errno = 999; - line = mx_call_external("/usr/bin/echo", "123", NULL); - assert(line && strcmp(line, "123") == 0); - free(line); + { + char *argv[] = { "/usr/bin/echo", "123", NULL }; + line = mx_pipe_external("/usr/bin/echo", argv); + assert(line && strcmp(line, "123") == 0); + free(line); + } - line = mx_call_external("/usr/bin/echo", "-n", "123", NULL); - assert(line && strcmp(line, "123") == 0); - free(line); + { + char *argv[] = { "/usr/bin/echo", "-n", "123", NULL }; + line = mx_pipe_external("/usr/bin/echo", argv); + assert(line && strcmp(line, "123") == 0); + free(line); + } - line = mx_call_external("/usr/bin/echo", "-ne", "123\n456\n\n", NULL); - assert(line && strcmp(line, "123\n456\n") == 0); - free(line); + { + char *argv[] = { "/usr/bin/echo", "-ne", "123\n456\n\n", NULL }; + line = mx_pipe_external("/usr/bin/echo", argv); + assert(line && strcmp(line, "123\n456\n") == 0); + free(line); + } - line = mx_call_external("/usr/bin/true", NULL); - assert(line && strcmp(line, "") == 0); - free(line); + { + char *argv[] = { "/usr/bin/true", NULL }; + line = mx_pipe_external("/usr/bin/true", argv); + assert(line && strcmp(line, "") == 0); + free(line); + } assert(errno == 999); - line = mx_call_external("/usr/bin/false", NULL); - assert(line == NULL && errno==EPROTO); + { + char *argv[] = { "/usr/bin/false", NULL }; + line = mx_pipe_external("/usr/bin/false", argv); + assert(line == NULL && errno==EPROTO); + } + + { + char *argv[] = { "/usr/bin/cat", "/usr/bin/bash", NULL }; + line = mx_pipe_external("/usr/bin/cat", argv); + assert(line == NULL && errno==EPROTO); + } + + { + char *argv[] = { "/usr/bin/yes", NULL }; + line = mx_pipe_external("/usr/bin/yes", argv); + assert(line == NULL && errno==EPROTO); + } +} + +static void test_mx_call_external(void) { + int sts; + errno = 999; + + sts = mx_call_external("/usr/bin/true", NULL); + assert(sts == 0); + assert(errno == 999); + + sts = mx_call_external("/usr/bin/false", NULL); + assert(sts == -1); + assert(errno == EPROTO); +} + +static void test_mx_closerange(void) { + int res; - line = mx_call_external("/usr/bin/cat", "/usr/bin/bash", NULL); - assert(line == NULL && errno==EPROTO); + int fd1 = open("/dev/null", O_RDONLY); + assert (fd1 != -1); + int fd2 = dup2(fd1, 100); + assert (fd2 == 100 ); - line = mx_call_external("/usr/bin/yes", NULL); - assert(line == NULL && errno==EPROTO); + res=close_range(3, ~0u, 0); + assert(res == 0); + res = close(fd1); + assert (res == -1 && errno == EBADF); + res = close(fd2); + assert (res == -1 && errno == EBADF); } -int main(int argc, char *argv[]) +int main(void) { test_mx_strskipwhitespaces(); test_mx_strtoul(); @@ -576,6 +628,8 @@ int main(int argc, char *argv[]) test_mx_cpuset(); test_listsort(); test_mx_df(); + test_mx_pipe_external(); test_mx_call_external(); + test_mx_closerange(); return 0; } diff --git a/test_mxqd_control.c b/test_mxqd_control.c index c8ba25b3..00f65968 100644 --- a/test_mxqd_control.c +++ b/test_mxqd_control.c @@ -29,7 +29,7 @@ static void test_mxqd_control(void) assert(1); } -int main(int argc, char *argv[]) +int main(void) { test_mxqd_control(); return 0;