diff --git a/.gitignore b/.gitignore index 4509600b..b7570cea 100644 --- a/.gitignore +++ b/.gitignore @@ -14,8 +14,11 @@ mxqkill.o test_mx_util.o test_mx_log.o test_mx_mysql.o +test_mxqd_control.o mxq_log.o mx_mysql.o +mxqd_control.o + mxqsub /mxqsub.1 @@ -26,7 +29,8 @@ mxqd mxqps test_mx_util test_mx_log -test_mx_mysql +test_mx_mysq +test_mxqd_control /web/pages/mxq/mxq web/lighttpd.conf diff --git a/Makefile b/Makefile index 21c95c5b..e38d03cb 100644 --- a/Makefile +++ b/Makefile @@ -73,12 +73,15 @@ MXQ_MYSQL_DEFAULT_GROUP_DEVELOPMENT = mxqdevel MXQ_INITIAL_PATH = /sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/usr/local/bin MXQ_INITIAL_TMPDIR = /tmp +MXQ_FINISHED_JOBSDIR = ${LOCALSTATEDIR}/spool/mxqd + CFLAGS_MXQ_MYSQL_DEFAULT_FILE = -DMXQ_MYSQL_DEFAULT_FILE=\"$(MXQ_MYSQL_DEFAULT_FILE)\" CFLAGS_MXQ_MYSQL_DEFAULT_GROUP = -DMXQ_MYSQL_DEFAULT_GROUP_CLIENT=\"$(MXQ_MYSQL_DEFAULT_GROUP_CLIENT)\" CFLAGS_MXQ_MYSQL_DEFAULT_GROUP += -DMXQ_MYSQL_DEFAULT_GROUP_SERVER=\"$(MXQ_MYSQL_DEFAULT_GROUP_SERVER)\" CFLAGS_MXQ_MYSQL_DEFAULT_GROUP += -DMXQ_MYSQL_DEFAULT_GROUP_DEVELOPMENT=\"$(MXQ_MYSQL_DEFAULT_GROUP_DEVELOPMENT)\" CFLAGS_MXQ_INITIAL_PATH = -DMXQ_INITIAL_PATH=\"$(MXQ_INITIAL_PATH)\" CFLAGS_MXQ_INITIAL_TMPDIR = -DMXQ_INITIAL_TMPDIR=\"$(MXQ_INITIAL_TMPDIR)\" +CFLAGS_MXQ_FINISHED_JOBSDIR = -DMXQ_FINISHED_JOBSDIR=\"${MXQ_FINISHED_JOBSDIR}\" MYSQL_CONFIG = mysql_config @@ -297,6 +300,13 @@ mxq_job.h += mxq_group.h mxqd.h += mxqd.h +### mxqd_conrol.h ------------------------------------------------------ + +mxqd_control.h += mxqd_control.h +mxqd_control.h += mxq_group.h +mxqd_control.h += mxq_job.h +mxqd_control.h += mxqd.h + ### mx_getopt.h -------------------------------------------------------- mx_getopt.h += mx_getopt.h @@ -403,6 +413,12 @@ mxq_job.o: CFLAGS += $(CFLAGS_MYSQL) clean: CLEAN += mxq_job.o +### mxqd_control.o ----------------------------------------------------- + +mxqd_control.o: $(mxqd_control.h) + +clean: CLEAN += mxqd_control.o + ### mxqd.o ------------------------------------------------------------- mxqd.o: $(mx_getopt.h) @@ -417,6 +433,7 @@ mxqd.o: $(mx_mysql.h) mxqd.o: CFLAGS += $(CFLAGS_MYSQL) mxqd.o: CFLAGS += $(CFLAGS_MXQ_INITIAL_PATH) mxqd.o: CFLAGS += $(CFLAGS_MXQ_INITIAL_TMPDIR) +mxqd.o: CFLAGS += $(CFLAGS_MXQ_FINISHED_JOBSDIR) mxqd.o: CFLAGS += -Wno-unused-but-set-variable clean: CLEAN += mxqd.o @@ -448,6 +465,7 @@ mxqd: mx_getopt.o mxqd: mxq_group.o mxqd: mxq_job.o mxqd: mx_mysql.o +mxqd: mxqd_control.o mxqd: LDLIBS += $(LDLIBS_MYSQL) build: mxqd @@ -596,3 +614,18 @@ test_mx_mysql: mx_log.o test_mx_mysql: mx_util.o test_mx_mysql: LDLIBS += $(LDLIBS_MYSQL) clean: CLEAN += test_mx_mysql + + +test_mxqd_control.o: $(mxqd_control.h) +clean: CLEAN += test_mxqd_control.o + +test_mxqd_control: mxqd_control.o +test_mxqd_control: mx_log.o +test_mxqd_control: mx_util.o +test_mxqd_control: mx_mysql.o +test_mxqd_control: mxq_group.o +test_mxqd_control: LDLIBS += $(LDLIBS_MYSQL) + +clean: CLEAN += test_mxqd_control + +test: test_mxqd_control diff --git a/mx_flock.c b/mx_flock.c index bdbbaf30..f6939f22 100644 --- a/mx_flock.c +++ b/mx_flock.c @@ -58,11 +58,7 @@ static inline void _flock_free(struct mx_flock *lock) if (!lock) return; - if (lock->fname) - mx_free_null(lock->fname); - - if (lock->fd >= 0) - _flock_close(lock); + mx_free_null(lock->fname); free(lock); } @@ -112,6 +108,7 @@ struct mx_flock *mx_flock(int operation, char *fmt, ...) if (errno == EWOULDBLOCK) return lock; mx_log_err("flock(): %m"); + _flock_close(lock); _flock_free(lock); return NULL; } @@ -155,3 +152,8 @@ int mx_funlock(struct mx_flock *lock) return res; } + +void mx_flock_free(struct mx_flock *lock) +{ + _flock_free(lock); +} diff --git a/mx_flock.h b/mx_flock.h index 22725383..34ab4e8d 100644 --- a/mx_flock.h +++ b/mx_flock.h @@ -10,5 +10,6 @@ struct mx_flock { struct mx_flock *mx_flock(int operation, char *fmt, ...); int mx_funlock(struct mx_flock *lock); +void mx_flock_free(struct mx_flock *lock); #endif diff --git a/mx_util.c b/mx_util.c index f2a74db2..a78dc6b0 100644 --- a/mx_util.c +++ b/mx_util.c @@ -11,8 +11,8 @@ #include -//#include -//#include +#include +#include #include #include "mx_log.h" @@ -1272,3 +1272,30 @@ char *mx_cpuset_to_str(cpu_set_t* cpuset_ptr) mx_strvec_free(strvec); return out; } + +int mx_mkdir_p(char *path, mode_t mode) +{ + struct stat st; + int err; + char *d; + _mx_cleanup_free_ char *copy = NULL; + + if (stat(path, &st) == 0) + return 0; + + copy=mx_strdup_forever(path); + d=copy; + + while (*++d == '/'); + + while ((d = strchr(d, '/'))) { + *d = '\0'; + err = stat(copy, &st) && mkdir(copy, mode); + *d++ = '/'; + if (err) + return -errno; + while (*d == '/') + ++d; + } + return (stat(copy, &st) && mkdir(copy, mode)) ? -errno : 0; +} diff --git a/mx_util.h b/mx_util.h index 92614fe5..c67ded7b 100644 --- a/mx_util.h +++ b/mx_util.h @@ -162,4 +162,7 @@ char* mx_strvec_join(char *sep,char **strvec); char* mx_cpuset_to_str(cpu_set_t* cpuset_ptr); int mx_str_to_cpuset(cpu_set_t* cpuset_ptr,char *str); +int mx_mkdir_p(char *path, mode_t mode); + + #endif diff --git a/mxq_job.c b/mxq_job.c index 4b33fda3..49d97a46 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -12,11 +12,12 @@ #include "mx_util.h" #include "mx_log.h" +#include "mx_util.h" #include "mxq_group.h" #include "mxq_job.h" -#define JOB_FIELDS_CNT 36 +#define JOB_FIELDS_CNT 37 #define JOB_FIELDS \ " job_id, " \ " job_status, " \ @@ -35,6 +36,7 @@ " host_hostname, " \ " host_pid, " \ " host_slots, " \ + " host_cpu_set, " \ " UNIX_TIMESTAMP(date_submit) as date_submit, " \ " UNIX_TIMESTAMP(date_start) as date_start, " \ " UNIX_TIMESTAMP(date_end) as date_end, " \ @@ -80,6 +82,7 @@ static int bind_result_job_fields(struct mx_mysql_bind *result, struct mxq_job * res += mx_mysql_bind_var(result, idx++, string, &(j->host_hostname)); res += mx_mysql_bind_var(result, idx++, uint32, &(j->host_pid)); res += mx_mysql_bind_var(result, idx++, uint32, &(j->host_slots)); + res += mx_mysql_bind_var(result, idx++, string, &(j->host_cpu_set_str)); res += mx_mysql_bind_var(result, idx++, int64, &(j->date_submit)); res += mx_mysql_bind_var(result, idx++, int64, &(j->date_start)); res += mx_mysql_bind_var(result, idx++, int64, &(j->date_end)); @@ -146,6 +149,7 @@ char *mxq_job_status_to_name(uint64_t status) void mxq_job_free_content(struct mxq_job *j) { mx_free_null(j->job_workdir); + mx_free_null(j->job_argv); mx_free_null(j->job_argv_str); mx_free_null(j->job_stdout); mx_free_null(j->job_stderr); @@ -160,17 +164,33 @@ void mxq_job_free_content(struct mxq_job *j) mx_free_null(j->host_id); mx_free_null(j->server_id); mx_free_null(j->host_hostname); - mx_free_null(j->job_argv); - j->job_argv = NULL; + mx_free_null(j->host_cpu_set_str); +} + +static int do_jobs_statement(struct mx_mysql *mysql, char *query, struct mx_mysql_bind *param, struct mxq_job **jobs) +{ + int res,i; + struct mxq_job j = {0}; + struct mx_mysql_bind result = {0}; + + res = bind_result_job_fields(&result, &j); + assert(res == 0); + + res = mx_mysql_do_statement(mysql, query, param, &result, &j, (void **)jobs, sizeof(**jobs)); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + for (i=0;igroup_id)); assert(res == 0); - res = bind_result_job_fields(&result, &j); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); + res=do_jobs_statement(mysql, query, ¶m, &jobs); if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); return res; } @@ -244,9 +256,7 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job ** { int res; struct mxq_job *jobs = NULL; - struct mxq_job j = {0}; struct mx_mysql_bind param = {0}; - struct mx_mysql_bind result = {0}; assert(mysql); assert(mxq_jobs); @@ -268,12 +278,8 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job ** res += mx_mysql_bind_var(¶m, 1, uint64, &job_status); assert(res == 0); - res = bind_result_job_fields(&result, &j); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); + res=do_jobs_statement(mysql, query, ¶m, &jobs); if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); return res; } @@ -415,22 +421,24 @@ int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job) " job_status = " status_str(MXQ_JOB_STATUS_RUNNING) "," " date_start = NULL," " host_pid = ?," - " host_slots = ?" + " host_slots = ?," + " host_cpu_set = ?" " WHERE job_id = ?" " AND job_status = " status_str(MXQ_JOB_STATUS_LOADED) " AND host_hostname = ?" " AND server_id = ?" " AND host_pid = 0"; - res = mx_mysql_bind_init_param(¶m, 5); + res = mx_mysql_bind_init_param(¶m, 6); assert(res == 0); res = 0; res += mx_mysql_bind_var(¶m, 0, uint32, &(job->host_pid)); res += mx_mysql_bind_var(¶m, 1, uint32, &(job->host_slots)); - res += mx_mysql_bind_var(¶m, 2, uint64, &(job->job_id)); - res += mx_mysql_bind_var(¶m, 3, string, &(job->host_hostname)); - res += mx_mysql_bind_var(¶m, 4, string, &(job->server_id)); + res += mx_mysql_bind_var(¶m, 2, string, &(job->host_cpu_set_str)); + res += mx_mysql_bind_var(¶m, 3, uint64, &(job->job_id)); + res += mx_mysql_bind_var(¶m, 4, string, &(job->host_hostname)); + res += mx_mysql_bind_var(¶m, 5, string, &(job->server_id)); assert(res == 0); res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, ¶m); @@ -573,6 +581,29 @@ int mxq_set_job_status_unknown_for_server(struct mx_mysql *mysql, char *hostname return res; } +int mxq_set_job_status_unknown(struct mx_mysql *mysql, struct mxq_job *job) +{ + int res; + struct mx_mysql_bind param = {0}; + + char *query = + "UPDATE mxq_job SET" + " job_status = " status_str(MXQ_JOB_STATUS_UNKNOWN) + " WHERE job_id = ?"; + + res = mx_mysql_bind_init_param(¶m, 1); + res += mx_mysql_bind_var(¶m, 0, uint64, &job->job_id); + assert(res == 0); + + res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, ¶m); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + + return res; +} + int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j) { if (!mx_streq(j->job_stdout, "/dev/null")) { @@ -605,9 +636,7 @@ int mxq_load_job_from_group_assigned_to_server(struct mx_mysql *mysql, struct mx { int res; struct mxq_job *jobs = NULL; - struct mxq_job j = {0}; struct mx_mysql_bind param = {0}; - struct mx_mysql_bind result = {0}; assert(mysql); assert(mxq_jobs); @@ -636,12 +665,8 @@ int mxq_load_job_from_group_assigned_to_server(struct mx_mysql *mysql, struct mx res += mx_mysql_bind_var(¶m, 2, uint64, &group_id); assert(res == 0); - res = bind_result_job_fields(&result, &j); - assert(res == 0); - - res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); + res=do_jobs_statement(mysql, query, ¶m, &jobs); if (res < 0) { - mx_log_err("mx_mysql_do_statement(): %m"); return res; } @@ -704,3 +729,34 @@ int mxq_load_job_from_group_for_server(struct mx_mysql *mysql, struct mxq_job *m return 1; } + +int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, char *hostname, char *server_id) +{ + int res; + struct mxq_job *jobs = NULL; + + struct mx_mysql_bind param = {0}; + + char *query = + "SELECT" + JOB_FIELDS + " FROM mxq_job" + " WHERE job_status = " status_str(MXQ_JOB_STATUS_RUNNING) + " AND host_hostname=?" + " AND server_id=?"; + res = mx_mysql_bind_init_param(¶m, 2); + assert(res == 0); + + res=0; + res += mx_mysql_bind_var(¶m, 0, string, &hostname); + res += mx_mysql_bind_var(¶m, 1, string, &server_id); + assert(res == 0); + + res=do_jobs_statement(mysql, query, ¶m, &jobs); + if (res < 0) { + return res; + } + + *mxq_jobs = jobs; + return res; +} diff --git a/mxq_job.h b/mxq_job.h index 3bf62bbf..921a484e 100644 --- a/mxq_job.h +++ b/mxq_job.h @@ -45,6 +45,7 @@ struct mxq_job { uint32_t host_pid; uint32_t host_slots; cpu_set_t host_cpu_set; + char * host_cpu_set_str; int64_t date_submit; int64_t date_start; @@ -103,9 +104,11 @@ int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, char *hostname, char *se int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job); int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job); int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job); +int mxq_set_job_status_unknown(struct mx_mysql *mysql, struct mxq_job *job); int mxq_set_job_status_unknown_for_server(struct mx_mysql *mysql, char *hostname, char *server_id); int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j); int mxq_load_job_from_group_assigned_to_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t group_id, char *hostname, char *server_id); int mxq_load_job_from_group_for_server(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, char *hostname, char *server_id, char *host_id); +int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, char *hostname, char *server_id); #endif diff --git a/mxqd.c b/mxqd.c index 86a1b96f..97277ae8 100644 --- a/mxqd.c +++ b/mxqd.c @@ -10,8 +10,9 @@ #include #include #include - +#include #include +#include #include @@ -40,6 +41,8 @@ #include "mxqd.h" #include "mxq.h" +#include "mxqd_control.h" + #ifndef MXQ_INITIAL_PATH # define MXQ_INITIAL_PATH "/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/usr/local/bin" #endif @@ -52,8 +55,10 @@ volatile sig_atomic_t global_sigint_cnt=0; volatile sig_atomic_t global_sigterm_cnt=0; +volatile sig_atomic_t global_sigquit_cnt=0; int mxq_redirect_output(char *stdout_fname, char *stderr_fname); +void server_free(struct mxq_server *server); static void print_usage(void) { @@ -124,7 +129,6 @@ static void cpuset_init_job(cpu_set_t *job_cpu_set,cpu_set_t *available,cpu_set_ for (cpu=CPU_SETSIZE-1;slots&&cpu>=0;cpu--) { if (CPU_ISSET(cpu,available) && !CPU_ISSET(cpu,running)) { CPU_SET(cpu,job_cpu_set); - CPU_SET(cpu,running); slots--; } } @@ -479,6 +483,13 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) exit(2); } + mx_asprintf_forever(&server->finished_jobsdir,"%s/%s",MXQ_FINISHED_JOBSDIR,server->server_id); + res=mx_mkdir_p(server->finished_jobsdir,0700); + if (res<0) { + mx_log_err("MAIN: mkdir %s failed: %m. Exiting.",MXQ_FINISHED_JOBSDIR); + exit(EX_IOERR); + } + if (arg_daemonize) { res = daemon(0, 1); if (res == -1) { @@ -573,361 +584,49 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) return 1; } -/**********************************************************************/ - -void group_init(struct mxq_group_list *group) -{ - struct mxq_server *s; - struct mxq_group *g; - - long double memory_threads; - long double memory_per_thread; - long double memory_max_available; - unsigned long slots_per_job; - unsigned long jobs_max; - unsigned long slots_max; - unsigned long memory_max; - - assert(group); - assert(group->user); - assert(group->user->server); - - s = group->user->server; - g = &group->group; - - memory_per_thread = (long double)g->job_memory / (long double) g->job_threads; - memory_max_available = (long double)s->memory_total * (long double)s->memory_max_per_slot / memory_per_thread; - - if (memory_max_available > s->memory_total) - memory_max_available = s->memory_total; - - slots_per_job = ceill((long double)g->job_memory / s->memory_avg_per_slot); - - if (slots_per_job < g->job_threads) - slots_per_job = g->job_threads; - - memory_threads = memory_max_available / memory_per_thread; - - if (memory_per_thread > s->memory_max_per_slot) { - jobs_max = memory_threads + 0.5; - } else if (memory_per_thread > s->memory_avg_per_slot) { - jobs_max = memory_threads + 0.5; - } else { - jobs_max = s->slots; - } - jobs_max /= g->job_threads; - - /* limit maximum number of jobs on user/group request */ - if (g->job_max_per_node && jobs_max > g->job_max_per_node) - jobs_max = g->job_max_per_node; - - slots_max = jobs_max * slots_per_job; - memory_max = jobs_max * g->job_memory; - - if (group->memory_per_thread != memory_per_thread - || group->memory_max_available != memory_max_available - || group->memory_max_available != memory_max_available - || group->slots_per_job != slots_per_job - || group->jobs_max != jobs_max - || group->slots_max != slots_max - || group->memory_max != memory_max) - mx_log_info(" group=%s(%u):%lu jobs_max=%lu slots_max=%lu memory_max=%lu slots_per_job=%lu :: group %sinitialized.", - g->user_name, g->user_uid, g->group_id, jobs_max, slots_max, memory_max, slots_per_job, - group->orphaned?"re":""); - - group->orphaned = 0; - group->memory_per_thread = memory_per_thread; - group->memory_max_available = memory_max_available; - group->slots_per_job = slots_per_job; - group->jobs_max = jobs_max; - group->slots_max = slots_max; - group->memory_max = memory_max; -} - -/**********************************************************************/ - -struct mxq_job_list *server_remove_job_by_pid(struct mxq_server *server, pid_t pid) -{ - struct mxq_user_list *user; - struct mxq_group_list *group; - struct mxq_job_list *job, *prev; - - for (user=server->users; user; user=user->next) { - for (group=user->groups; group; group=group->next) { - for (job=group->jobs, prev=NULL; job; prev=job,job=job->next) { - if (job->job.host_pid == pid) { - if (prev) { - prev->next = job->next; - } else { - assert(group->jobs); - assert(group->jobs == job); - - group->jobs = job->next; - } - - group->job_cnt--; - user->job_cnt--; - server->job_cnt--; - - group->slots_running -= job->job.host_slots; - user->slots_running -= job->job.host_slots; - server->slots_running -= job->job.host_slots; - - group->threads_running -= group->group.job_threads; - user->threads_running -= group->group.job_threads; - server->threads_running -= group->group.job_threads; - - group->group.group_jobs_running--; - - group->jobs_running--; - user->jobs_running--; - server->jobs_running--; - - group->memory_used -= group->group.job_memory; - user->memory_used -= group->group.job_memory; - server->memory_used -= group->group.job_memory; - - return job; - } - } - } - } - return NULL; -} - -/**********************************************************************/ - -struct mxq_user_list *user_list_find_uid(struct mxq_user_list *list, uint32_t uid) -{ - struct mxq_user_list *u; - - for (u = list; u; u = u->next) { - assert(u->groups); - if (u->groups[0].group.user_uid == uid) { - return u; - } - } - return NULL; -} - -/**********************************************************************/ - -struct mxq_group_list *group_list_find_group(struct mxq_group_list *list, struct mxq_group *group) +static void reset_signals() { - struct mxq_group_list *g; - - assert(group); - - for (g = list; g; g = g->next) { - if (g->group.group_id == group->group_id) { - return g; - } - } - return NULL; + signal(SIGINT, SIG_DFL); + signal(SIGTERM, SIG_DFL); + signal(SIGQUIT, SIG_DFL); + signal(SIGHUP, SIG_DFL); + signal(SIGTSTP, SIG_DFL); + signal(SIGTTIN, SIG_DFL); + signal(SIGTTOU, SIG_DFL); + signal(SIGCHLD, SIG_DFL); + signal(SIGPIPE, SIG_DFL); } -/**********************************************************************/ - -struct mxq_job_list *group_add_job(struct mxq_group_list *group, struct mxq_job *job) +static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job) { - struct mxq_job_list *j; - struct mxq_job_list *jlist; - struct mxq_server *server; - struct mxq_user_list *user; - - struct mxq_group *mxqgrp; - - assert(group); - assert(group->user); - assert(group->user->server); - assert(job->job_status == MXQ_JOB_STATUS_RUNNING); - - mxqgrp = &group->group; - user = group->user; - server = user->server; - - j = mx_calloc_forever(1, sizeof(*j)); - assert(j); - - jlist = group->jobs; - - memcpy(&j->job, job, sizeof(*job)); - - j->group = group; - j->next = jlist; - - group->jobs = j; - - group->job_cnt++; - user->job_cnt++; - server->job_cnt++; - - group->slots_running += group->slots_per_job; - user->slots_running += group->slots_per_job; - server->slots_running += group->slots_per_job; - - group->threads_running += mxqgrp->job_threads; - user->threads_running += mxqgrp->job_threads; - server->threads_running += mxqgrp->job_threads; - - mxqgrp->group_jobs_running++; - mxqgrp->group_jobs_inq--; - - group->jobs_running++; - user->jobs_running++; - server->jobs_running++; - - group->memory_used += mxqgrp->job_memory; - user->memory_used += mxqgrp->job_memory; - server->memory_used += mxqgrp->job_memory; - - assert(j); - return j; -} -/**********************************************************************/ - -struct mxq_group_list *user_add_group(struct mxq_user_list *user, struct mxq_group *group) -{ - struct mxq_group_list *g; - struct mxq_group_list *glist; - - assert(user); - - g = mx_calloc_forever(1, sizeof(*g)); - assert(g); - - glist = user->groups; - - memcpy(&g->group, group, sizeof(*group)); - - g->user = user; - g->next = glist; - - user->groups = g; - user->group_cnt++; - - assert(user->server); - user->server->group_cnt++; - - group_init(g); - - assert(g); - return g; -} - -/**********************************************************************/ - -struct mxq_group_list *server_add_user(struct mxq_server *server, struct mxq_group *group) -{ - struct mxq_user_list *user; - struct mxq_user_list *ulist; - struct mxq_group_list *glist; - - assert(server); - assert(group); - - user = mx_calloc_forever(1, sizeof(*user)); - assert(user); - - user->server = server; - - glist = user_add_group(user, group); - assert(glist); - - ulist = server->users; - - user->next = ulist; - - server->users = user; - server->user_cnt++; - - assert(glist); - return glist; -} - -/**********************************************************************/ - -struct mxq_group_list *user_update_groupdata(struct mxq_user_list *user, struct mxq_group *group) -{ - struct mxq_group_list *glist; - - glist = group_list_find_group(user->groups, group); - if (!glist) { - return user_add_group(user, group); - } - - mxq_group_free_content(&glist->group); - memcpy(&glist->group, group, sizeof(*group)); - - group_init(glist); - - return glist; -} - -/**********************************************************************/ - -static struct mxq_group_list *server_update_groupdata(struct mxq_server *server, struct mxq_group *group) -{ - struct mxq_user_list *user; - - user = user_list_find_uid(server->users, group->user_uid); - if (!user) { - return server_add_user(server, group); - } - - return user_update_groupdata(user, group); -} - -static int init_child_process(struct mxq_group_list *group, struct mxq_job *j) -{ - struct mxq_group *g; - struct mxq_server *s; + struct mxq_group *group; struct passwd *passwd; - pid_t pid; int res; int fh; struct rlimit rlim; - assert(j); - assert(group); - assert(group->user); - assert(group->user->server); - - s = group->user->server; - g = &group->group; - - /** restore signal handler **/ - signal(SIGINT, SIG_DFL); - signal(SIGTERM, SIG_DFL); - signal(SIGQUIT, SIG_DFL); - signal(SIGHUP, SIG_DFL); - signal(SIGTSTP, SIG_DFL); - signal(SIGTTIN, SIG_DFL); - signal(SIGTTOU, SIG_DFL); - signal(SIGCHLD, SIG_DFL); + assert(job); + assert(glist); + assert(glist->user); + assert(glist->user->server); - /* reset SIGPIPE which seems to be ignored by mysqlclientlib (?) */ - signal(SIGPIPE, SIG_DFL); + server = glist->user->server; + group = &glist->group; - /** set sessionid and pgrp leader **/ - pid = setsid(); - if (pid == -1) { - mx_log_err("job=%s(%d):%lu:%lu setsid(): %m", - g->user_name, g->user_uid, g->group_id, j->job_id); - } + reset_signals(); - passwd = getpwuid(g->user_uid); + passwd = getpwuid(group->user_uid); if (!passwd) { mx_log_err("job=%s(%d):%lu:%lu getpwuid(): %m", - g->user_name, g->user_uid, g->group_id, j->job_id); + group->user_name, group->user_uid, group->group_id, job->job_id); return 0; } - if (!mx_streq(passwd->pw_name, g->user_name)) { + if (!mx_streq(passwd->pw_name, group->user_name)) { mx_log_err("job=%s(%d):%lu:%lu user_uid=%d does not map to user_name=%s but to pw_name=%s: Invalid user mapping", - g->user_name, g->user_uid, g->group_id, j->job_id, - g->user_uid, g->user_name, passwd->pw_name); + group->user_name, group->user_uid, group->group_id, job->job_id, + group->user_uid, group->user_name, passwd->pw_name); return 0; } @@ -937,56 +636,56 @@ static int init_child_process(struct mxq_group_list *group, struct mxq_job *j) res = clearenv(); if (res != 0) { mx_log_err("job=%s(%d):%lu:%lu clearenv(): %m", - g->user_name, g->user_uid, g->group_id, j->job_id); + group->user_name, group->user_uid, group->group_id, job->job_id); return 0; } - mx_setenv_forever("USER", g->user_name); - mx_setenv_forever("USERNAME", g->user_name); - mx_setenv_forever("LOGNAME", g->user_name); - mx_setenv_forever("PATH", s->initial_path); - mx_setenv_forever("TMPDIR", s->initial_tmpdir); - mx_setenv_forever("PWD", j->job_workdir); + mx_setenv_forever("USER", group->user_name); + mx_setenv_forever("USERNAME", group->user_name); + mx_setenv_forever("LOGNAME", group->user_name); + mx_setenv_forever("PATH", server->initial_path); + mx_setenv_forever("TMPDIR", server->initial_tmpdir); + mx_setenv_forever("PWD", job->job_workdir); mx_setenv_forever("HOME", passwd->pw_dir); mx_setenv_forever("SHELL", passwd->pw_shell); mx_setenv_forever("HOSTNAME", mx_hostname()); - mx_setenvf_forever("JOB_ID", "%lu", j->job_id); - mx_setenvf_forever("MXQ_JOBID", "%lu", j->job_id); - mx_setenvf_forever("MXQ_THREADS", "%d", g->job_threads); - mx_setenvf_forever("MXQ_SLOTS", "%lu", group->slots_per_job); - mx_setenvf_forever("MXQ_MEMORY", "%lu", g->job_memory); - mx_setenvf_forever("MXQ_TIME", "%d", g->job_time); - mx_setenv_forever("MXQ_HOSTID", s->host_id); - mx_setenv_forever("MXQ_HOSTNAME", s->hostname); - mx_setenv_forever("MXQ_SERVERID", s->server_id); + mx_setenvf_forever("JOB_ID", "%lu", job->job_id); + mx_setenvf_forever("MXQ_JOBID", "%lu", job->job_id); + mx_setenvf_forever("MXQ_THREADS", "%d", group->job_threads); + mx_setenvf_forever("MXQ_SLOTS", "%lu", glist->slots_per_job); + mx_setenvf_forever("MXQ_MEMORY", "%lu", group->job_memory); + mx_setenvf_forever("MXQ_TIME", "%d", group->job_time); + mx_setenv_forever("MXQ_HOSTID", server->host_id); + mx_setenv_forever("MXQ_HOSTNAME", server->hostname); + mx_setenv_forever("MXQ_SERVERID", server->server_id); fh = open("/proc/self/loginuid", O_WRONLY|O_TRUNC); if (fh == -1) { mx_log_err("job=%s(%d):%lu:%lu open(%s) failed: %m", - g->user_name, g->user_uid, g->group_id, j->job_id, "/proc/self/loginuid"); + group->user_name, group->user_uid, group->group_id, job->job_id, "/proc/self/loginuid"); return 0; } - dprintf(fh, "%d", g->user_uid); + dprintf(fh, "%d", group->user_uid); close(fh); /* set memory limits */ - rlim.rlim_cur = g->job_memory*1024*1024; - rlim.rlim_max = g->job_memory*1024*1024; + rlim.rlim_cur = group->job_memory*1024*1024; + rlim.rlim_max = group->job_memory*1024*1024; res = setrlimit(RLIMIT_AS, &rlim); if (res == -1) mx_log_err("job=%s(%d):%lu:%lu setrlimit(RLIMIT_AS, ...) failed: %m", - g->user_name, g->user_uid, g->group_id, j->job_id); + group->user_name, group->user_uid, group->group_id, job->job_id); res = setrlimit(RLIMIT_DATA, &rlim); if (res == -1) mx_log_err("job=%s(%d):%lu:%lu setrlimit(RLIMIT_DATA, ...) failed: %m", - g->user_name, g->user_uid, g->group_id, j->job_id); + group->user_name, group->user_uid, group->group_id, job->job_id); res = setrlimit(RLIMIT_RSS, &rlim); if (res == -1) mx_log_err("job=%s(%d):%lu:%lu setrlimit(RLIMIT_RSS, ...) failed: %m", - g->user_name, g->user_uid, g->group_id, j->job_id); + group->user_name, group->user_uid, group->group_id, job->job_id); /* disable core files */ rlim.rlim_cur = 0; @@ -995,57 +694,57 @@ static int init_child_process(struct mxq_group_list *group, struct mxq_job *j) res = setrlimit(RLIMIT_CORE, &rlim); if (res == -1) mx_log_err("job=%s(%d):%lu:%lu setrlimit(RLIMIT_CORE, ...) failed: %m", - g->user_name, g->user_uid, g->group_id, j->job_id); + group->user_name, group->user_uid, group->group_id, job->job_id); /* set single threaded time limits */ - if (g->job_threads == 1) { + if (group->job_threads == 1) { /* set cpu time limits - hardlimit is 105% of softlimit */ - rlim.rlim_cur = g->job_time*60; - rlim.rlim_cur = g->job_time*63; + rlim.rlim_cur = group->job_time*60; + rlim.rlim_cur = group->job_time*63; res = setrlimit(RLIMIT_CPU, &rlim); if (res == -1) mx_log_err("job=%s(%d):%lu:%lu setrlimit(RLIMIT_CPU, ...) failed: %m", - g->user_name, g->user_uid, g->group_id, j->job_id); + group->user_name, group->user_uid, group->group_id, job->job_id); } if(RUNNING_AS_ROOT) { - res = initgroups(passwd->pw_name, g->user_gid); + res = initgroups(passwd->pw_name, group->user_gid); if (res == -1) { mx_log_err("job=%s(%d):%lu:%lu initgroups() failed: %m", - g->user_name, g->user_uid, g->group_id, j->job_id); + group->user_name, group->user_uid, group->group_id, job->job_id); return 0; } - res = setregid(g->user_gid, g->user_gid); + res = setregid(group->user_gid, group->user_gid); if (res == -1) { mx_log_err("job=%s(%d):%lu:%lu setregid(%d, %d) failed: %m", - g->user_name, g->user_uid, g->group_id, j->job_id, - g->user_gid, g->user_gid); + group->user_name, group->user_uid, group->group_id, job->job_id, + group->user_gid, group->user_gid); return 0; } - res = setreuid(g->user_uid, g->user_uid); + res = setreuid(group->user_uid, group->user_uid); if (res == -1) { mx_log_err("job=%s(%d):%lu:%lu setreuid(%d, %d) failed: %m", - g->user_name, g->user_uid, g->group_id, j->job_id, - g->user_uid, g->user_uid); + group->user_name, group->user_uid, group->group_id, job->job_id, + group->user_uid, group->user_uid); return 0; } } - res = chdir(j->job_workdir); + res = chdir(job->job_workdir); if (res == -1) { mx_log_err("job=%s(%d):%lu:%lu chdir(%s) failed: %m", - g->user_name, g->user_uid, g->group_id, j->job_id, - j->job_workdir); + group->user_name, group->user_uid, group->group_id, job->job_id, + job->job_workdir); return 0; } - umask(j->job_umask); + umask(job->job_umask); - res=sched_setaffinity(0,sizeof(j->host_cpu_set),&j->host_cpu_set); + res=sched_setaffinity(0,sizeof(job->host_cpu_set),&job->host_cpu_set); if (res<0) mx_log_warning("sched_setaffinity: $m"); return 1; @@ -1067,8 +766,8 @@ int mxq_redirect_open(char *fname) } else if (!mx_streq(fname, "/dev/null")) { res = unlink(fname); if (res == -1 && errno != ENOENT) { - mx_log_err("unlink() failed: %m"); - return -2; + mx_log_err("%s: unlink() failed: %m", fname); + return -2; } flags |= O_EXCL; } @@ -1142,32 +841,207 @@ int mxq_redirect_input(char *stdin_fname) return 1; } +int user_process(struct mxq_group_list *glist, struct mxq_job *job) +{ + int res; + char **argv; + + struct mxq_group *group; + + group = &glist->group; + + res = init_child_process(glist, job); + if (!res) + return(-1); + + mxq_job_set_tmpfilenames(group, job); + + res = mxq_redirect_input("/dev/null"); + if (res < 0) { + mx_log_err(" job=%s(%d):%lu:%lu mxq_redirect_input() failed (%d): %m", + group->user_name, + group->user_uid, + group->group_id, + job->job_id, + res); + return(res); + } + + res = mxq_redirect_output(job->tmp_stdout, job->tmp_stderr); + if (res < 0) { + mx_log_err(" job=%s(%d):%lu:%lu mxq_redirect_output() failed (%d): %m", + group->user_name, + group->user_uid, + group->group_id, + job->job_id, + res); + return(res); + } + + argv = mx_strvec_from_str(job->job_argv_str); + if (!argv) { + mx_log_err("job=%s(%d):%lu:%lu Can't recaculate commandline. str_to_strvev(%s) failed: %m", + group->user_name, + group->user_uid, + group->group_id, + job->job_id, + job->job_argv_str); + return -errno; + } + + res = execvp(argv[0], argv); + mx_log_err("job=%s(%d):%lu:%lu execvp(\"%s\", ...): %m", + group->user_name, + group->user_uid, + group->group_id, + job->job_id, + argv[0]); + return res; +} + +int reaper_process(struct mxq_server *server,struct mxq_group_list *glist, struct mxq_job *job) { + pid_t pid; + struct rusage rusage; + int status; + pid_t waited_pid; + int waited_status; + struct timeval now; + struct timeval realtime; + _mx_cleanup_free_ char *finished_job_filename=NULL; + _mx_cleanup_free_ char *finished_job_tmpfilename=NULL; + FILE *out; + int res; + + struct mxq_group *group; + + group = &glist->group; + + reset_signals(); + + signal(SIGINT, SIG_IGN); + signal(SIGTERM, SIG_IGN); + signal(SIGHUP, SIG_IGN); + signal(SIGXCPU, SIG_IGN); + + res = setsid(); + if (res < 0) { + mx_log_warning("reaper_process setsid: %m"); + return res; + } + + res = prctl(PR_SET_CHILD_SUBREAPER, 1); + if (res < 0) { + mx_log_err("set subreaper: %m"); + return res; + } + + pid = fork(); + if (pid < 0) { + mx_log_err("fork: %m"); + return pid; + } else if (pid == 0) { + mx_log_info("starting user process."); + res = user_process(glist, job); + _exit(EX__MAX+1); + } + gettimeofday(&job->stats_starttime, NULL); -unsigned long start_job(struct mxq_group_list *group) + while (1) { + waited_pid = wait(&waited_status); + if (waited_pid < 0) { + if (errno==ECHILD) { + break; + } else { + mx_log_warning("reaper: wait: %m"); + sleep(1); + } + } + if (waited_pid == pid) { + status = waited_status; + } + } + gettimeofday(&now, NULL); + timersub(&now, &job->stats_starttime, &realtime); + res = getrusage(RUSAGE_CHILDREN, &rusage); + if (res < 0) { + mx_log_err("reaper: getrusage: %m"); + return(res); + } + + mx_asprintf_forever(&finished_job_filename, "%s/%lu.stat", server->finished_jobsdir, job->job_id); + mx_asprintf_forever(&finished_job_tmpfilename, "%s.tmp", finished_job_filename); + + out=fopen(finished_job_tmpfilename,"w"); + if (!out) { + mx_log_fatal("%s: %m",finished_job_tmpfilename); + return (-errno); + } + + fprintf(out,"1 %d %d %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld\n", + getpid(), + status, + realtime.tv_sec,realtime.tv_usec, + rusage.ru_utime.tv_sec,rusage.ru_utime.tv_usec, + rusage.ru_stime.tv_sec,rusage.ru_stime.tv_usec, + rusage.ru_maxrss, + rusage.ru_ixrss, + rusage.ru_idrss, + rusage.ru_isrss, + rusage.ru_minflt, + rusage.ru_majflt, + rusage.ru_nswap, + rusage.ru_inblock, + rusage.ru_oublock, + rusage.ru_msgsnd, + rusage.ru_msgrcv, + rusage.ru_nsignals, + rusage.ru_nvcsw, + rusage.ru_nivcsw + ); + fflush(out); + fsync(fileno(out)); + fclose(out); + res=rename(finished_job_tmpfilename,finished_job_filename); + if (res<0) { + mx_log_fatal("rename %s: %m",finished_job_tmpfilename); + return(res); + } + return(0); +} + +unsigned long start_job(struct mxq_group_list *glist) { struct mxq_server *server; - struct mxq_job mxqjob; - struct mxq_job_list *job; + struct mxq_job_list *jlist; + + struct mxq_job _mxqjob; + struct mxq_job *job; + + struct mxq_group *group; + pid_t pid; int res; - char **argv; - assert(group); - assert(group->user); - assert(group->user->server); - - server = group->user->server; + assert(glist); + assert(glist->user); + assert(glist->user->server); - res = mxq_load_job_from_group_for_server(server->mysql, &mxqjob, group->group.group_id, server->hostname, server->server_id, server->host_id); + server = glist->user->server; + group = &glist->group; + job = &_mxqjob; + res = mxq_load_job_from_group_for_server(server->mysql, job, group->group_id, server->hostname, server->server_id, server->host_id); if (!res) { return 0; } mx_log_info(" job=%s(%d):%lu:%lu :: new job loaded.", - group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id); + group->user_name, group->user_uid, group->group_id, job->job_id); - cpuset_init_job(&mxqjob.host_cpu_set,&server->cpu_set_available,&server->cpu_set_running,group->slots_per_job); - cpuset_log(" job assigned cpus: ",&mxqjob.host_cpu_set); + cpuset_init_job(&job->host_cpu_set, &server->cpu_set_available, &server->cpu_set_running, glist->slots_per_job); + mx_free_null(job->host_cpu_set_str); + job->host_cpu_set_str = mx_cpuset_to_str(&job->host_cpu_set); + + mx_log_info("job assigned cpus: [%s]", job->host_cpu_set_str); mx_mysql_disconnect(server->mysql); @@ -1176,169 +1050,147 @@ unsigned long start_job(struct mxq_group_list *group) mx_log_err("fork: %m"); return 0; } else if (pid == 0) { - mxqjob.host_pid = getpid(); + job->host_pid = getpid(); mx_log_info(" job=%s(%d):%lu:%lu host_pid=%d pgrp=%d :: new child process forked.", - group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id, - mxqjob.host_pid, getpgrp()); - - res = init_child_process(group, &mxqjob); - if (!res) - _exit(EX__MAX + 1); - - mxq_job_set_tmpfilenames(&group->group, &mxqjob); - - - res = mxq_redirect_input("/dev/null"); - if (res < 0) { - mx_log_err(" job=%s(%d):%lu:%lu mxq_redirect_input() failed (%d): %m", - group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id, - res); - _exit(EX__MAX + 1); - } + group->user_name, + group->user_uid, + group->group_id, + job->job_id, + job->host_pid, + getpgrp()); - res = mxq_redirect_output(mxqjob.tmp_stdout, mxqjob.tmp_stderr); - if (res < 0) { - mx_log_err(" job=%s(%d):%lu:%lu mxq_redirect_output() failed (%d): %m", - group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id, - res); - _exit(EX__MAX + 1); - } + mx_log_info("starting reaper process."); + mx_mysql_finish(&server->mysql); + res = reaper_process(server, glist, job); - argv = mx_strvec_from_str(mxqjob.job_argv_str); - if (!argv) { - mx_log_err("job=%s(%d):%lu:%lu Can't recaculate commandline. str_to_strvev(%s) failed: %m", - group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id, - mxqjob.job_argv_str); - _exit(EX__MAX + 1); - } + mxq_job_free_content(job); - execvp(argv[0], argv); - mx_log_err("job=%s(%d):%lu:%lu execvp(\"%s\", ...): %m", - group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id, - argv[0]); - _exit(EX__MAX + 1); + mx_log_info("shutting down reaper, bye bye."); + mx_log_finish(); + server_free(server); + _exit(res<0 ? EX__MAX+1 : 0); } - gettimeofday(&mxqjob.stats_starttime, NULL); + gettimeofday(&job->stats_starttime, NULL); mx_mysql_connect_forever(&(server->mysql)); - mxqjob.host_pid = pid; - mxqjob.host_slots = group->slots_per_job; - res = mxq_set_job_status_running(server->mysql, &mxqjob); + job->host_pid = pid; + job->host_slots = glist->slots_per_job; + res = mxq_set_job_status_running(server->mysql, job); if (res < 0) mx_log_err("job=%s(%d):%lu:%lu mxq_job_update_status_running(): %m", - group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id); + group->user_name, group->user_uid, group->group_id, job->job_id); if (res == 0) mx_log_err("job=%s(%d):%lu:%lu mxq_job_update_status_running(): Job not found.", - group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id); + group->user_name, group->user_uid, group->group_id, job->job_id); - job = group_add_job(group, &mxqjob); - assert(job); + jlist = group_list_add_job(glist, job); + assert(jlist); mx_log_info(" job=%s(%d):%lu:%lu :: added running job to watch queue.", - group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id); + group->user_name, group->user_uid, group->group_id, job->job_id); return 1; } /**********************************************************************/ -unsigned long start_user(struct mxq_user_list *user, int job_limit, long slots_to_start) +unsigned long start_user(struct mxq_user_list *ulist, int job_limit, long slots_to_start) { struct mxq_server *server; - struct mxq_group_list *group; + struct mxq_group_list *glist; struct mxq_group_list *gnext = NULL; - struct mxq_group *mxqgrp; + struct mxq_group *group; unsigned int prio; unsigned char started = 0; unsigned long slots_started = 0; int jobs_started = 0; - assert(user); - assert(user->server); - assert(user->groups); + assert(ulist); + assert(ulist->server); + assert(ulist->groups); - server = user->server; - group = user->groups; - mxqgrp = &group->group; + server = ulist->server; + glist = ulist->groups; + group = &glist->group; - prio = mxqgrp->group_priority; + prio = group->group_priority; assert(slots_to_start <= server->slots - server->slots_running); mx_log_debug(" user=%s(%d) slots_to_start=%ld job_limit=%d :: trying to start jobs for user.", - mxqgrp->user_name, mxqgrp->user_uid, slots_to_start, job_limit); + group->user_name, group->user_uid, slots_to_start, job_limit); - for (group=user->groups; group && slots_to_start > 0 && (!job_limit || jobs_started < job_limit); group=gnext) { + for (glist = ulist->groups; glist && slots_to_start > 0 && (!job_limit || jobs_started < job_limit); glist = gnext) { - mxqgrp = &group->group; + group = &glist->group; - assert(group->jobs_running <= mxqgrp->group_jobs); - assert(group->jobs_running <= group->jobs_max); + assert(glist->jobs_running <= group->group_jobs); + assert(glist->jobs_running <= glist->jobs_max); - if (group->jobs_running == mxqgrp->group_jobs) { - gnext = group->next; + if (glist->jobs_running == group->group_jobs) { + gnext = glist->next; if (!gnext && started) { - gnext = group->user->groups; + gnext = ulist->groups; started = 0; } continue; } - if (group->jobs_running == group->jobs_max) { - gnext = group->next; + if (glist->jobs_running == glist->jobs_max) { + gnext = glist->next; if (!gnext && started) { - gnext = group->user->groups; + gnext = ulist->groups; started = 0; } continue; } - if (mxq_group_jobs_inq(mxqgrp) == 0) { - gnext = group->next; + if (mxq_group_jobs_inq(group) == 0) { + gnext = glist->next; if (!gnext && started) { - gnext = group->user->groups; + gnext = ulist->groups; started = 0; } continue; } - if (group->slots_per_job > slots_to_start) { - gnext = group->next; + if (glist->slots_per_job > slots_to_start) { + gnext = glist->next; if (!gnext && started) { - gnext = group->user->groups; + gnext = ulist->groups; started = 0; } continue; } - if (mxqgrp->group_priority < prio) { + if (group->group_priority < prio) { if (started) { - gnext = group->user->groups; + gnext = ulist->groups; started = 0; continue; } - prio = mxqgrp->group_priority; + prio = group->group_priority; } mx_log_info(" group=%s(%d):%lu slots_to_start=%ld slots_per_job=%lu :: trying to start job for group.", - mxqgrp->user_name, mxqgrp->user_uid, mxqgrp->group_id, slots_to_start, group->slots_per_job); + group->user_name, group->user_uid, group->group_id, slots_to_start, glist->slots_per_job); - if (start_job(group)) { + if (start_job(glist)) { - slots_to_start -= group->slots_per_job; + slots_to_start -= glist->slots_per_job; jobs_started++; - slots_started += group->slots_per_job; + slots_started += glist->slots_per_job; started = 1; } - gnext = group->next; + gnext = glist->next; if (!gnext && started) { - gnext = group->user->groups; + gnext = ulist->groups; started = 0; } } @@ -1349,12 +1201,13 @@ unsigned long start_user(struct mxq_user_list *user, int job_limit, long slots_t unsigned long start_users(struct mxq_server *server) { - long slots_to_start; unsigned long slots_started; - int started = 0; unsigned long slots_started_total = 0; + long slots_to_start; + int started = 0; - struct mxq_user_list *user, *unext=NULL; + struct mxq_user_list *ulist; + struct mxq_user_list *unext = NULL; assert(server); @@ -1363,27 +1216,27 @@ unsigned long start_users(struct mxq_server *server) mx_log_debug("=== starting jobs on free_slots=%lu slots for user_cnt=%lu users", server->slots - server->slots_running, server->user_cnt); - for (user=server->users; user; user=user->next) { + for (ulist = server->users; ulist; ulist = ulist->next) { - slots_to_start = server->slots / server->user_cnt - user->slots_running; + slots_to_start = server->slots / server->user_cnt - ulist->slots_running; if (slots_to_start < 0) continue; - if (server->slots - server->slots_running < slots_to_start) - slots_to_start = server->slots - server->slots_running; + if (slots_to_start > (server->slots - server->slots_running)) + slots_to_start = (server->slots - server->slots_running); - slots_started = start_user(user, 0, slots_to_start); + slots_started = start_user(ulist, 0, slots_to_start); slots_started_total += slots_started; } - for (user=server->users; user && server->slots - server->slots_running; user=unext) { + for (ulist = server->users; ulist && server->slots - server->slots_running; ulist = unext) { slots_to_start = server->slots - server->slots_running; - slots_started = start_user(user, 1, slots_to_start); + slots_started = start_user(ulist, 1, slots_to_start); slots_started_total += slots_started; started = (started || slots_started); - unext = user->next; + unext = ulist->next; if (!unext && started) { unext = server->users; started = 0; @@ -1395,58 +1248,68 @@ unsigned long start_users(struct mxq_server *server) /**********************************************************************/ -int remove_orphaned_groups(struct mxq_server *server) +int remove_orphaned_group_lists(struct mxq_server *server) { - struct mxq_user_list *user, *unext, *uprev; - struct mxq_group_list *group, *gnext, *gprev; + struct mxq_user_list *ulist, *unext, *uprev; + struct mxq_group_list *glist, *gnext, *gprev; + + struct mxq_group *group; + int cnt=0; - for (user=server->users, uprev=NULL; user; user=unext) { - unext = user->next; - for (group=user->groups, gprev=NULL; group; group=gnext) { - gnext = group->next; + for (ulist = server->users, uprev = NULL; ulist; ulist = unext) { + unext = ulist->next; - if (group->job_cnt) { - gprev = group; + for (glist = ulist->groups, gprev = NULL; glist; glist = gnext) { + gnext = glist->next; + group = &glist->group; + + if (glist->job_cnt) { + gprev = glist; continue; } - assert(!group->jobs); + assert(!glist->jobs); - if (!group->orphaned && mxq_group_jobs_active(&group->group)) { - group->orphaned = 1; - gprev = group; + if (!glist->orphaned && mxq_group_jobs_active(group)) { + glist->orphaned = 1; + gprev = glist; continue; } if (gprev) { gprev->next = gnext; } else { - assert(group == user->groups); - user->groups = gnext; + assert(glist == ulist->groups); + ulist->groups = gnext; } - mx_log_info("group=%s(%d):%lu : Removing orphaned group.", group->group.user_name, group->group.user_uid, group->group.group_id); + mx_log_info("group=%s(%d):%lu : Removing orphaned group.", + group->user_name, + group->user_uid, + group->group_id); - user->group_cnt--; + ulist->group_cnt--; server->group_cnt--; cnt++; - mxq_group_free_content(&group->group); - mx_free_null(group); + mxq_group_free_content(group); + mx_free_null(glist); } - if(user->groups) { - uprev = user; + + if(ulist->groups) { + uprev = ulist; continue; } if (uprev) { uprev->next = unext; } else { - assert(user == server->users); + assert(ulist == server->users); server->users = unext; } + server->user_cnt--; - mx_free_null(user); + mx_free_null(ulist); mx_log_info("Removed orphaned user. %lu users left.", server->user_cnt); } @@ -1455,84 +1318,124 @@ int remove_orphaned_groups(struct mxq_server *server) void server_dump(struct mxq_server *server) { - struct mxq_user_list *user; - struct mxq_group_list *group; - struct mxq_job_list *job; + struct mxq_user_list *ulist; + struct mxq_group_list *glist; + struct mxq_job_list *jlist; + + struct mxq_group *group; + struct mxq_job *job; if (!server->user_cnt) return; mx_log_info("====================== SERVER DUMP START ======================"); - for (user=server->users; user; user=user->next) { + for (ulist = server->users; ulist; ulist = ulist->next) { + if (!ulist->groups) { + mx_log_fatal("BUG: missing group in userlist."); + continue; + } + group = &ulist->groups[0].group; mx_log_info(" user=%s(%d) slots_running=%lu", - user->groups->group.user_name, user->groups->group.user_uid, - user->slots_running); - for (group=user->groups; group; group=group->next) { + group->user_name, + group->user_uid, + ulist->slots_running); + + for (glist = ulist->groups; glist; glist = glist->next) { + group = &glist->group; mx_log_info(" group=%s(%d):%lu %s jobs_in_q=%lu", - group->group.user_name, group->group.user_uid, group->group.group_id, - group->group.group_name, mxq_group_jobs_inq(&group->group)); - for (job=group->jobs; job; job=job->next) { + group->user_name, + group->user_uid, + group->group_id, + group->group_name, + mxq_group_jobs_inq(group)); + for (jlist = glist->jobs; jlist; jlist = jlist->next) { + job = &jlist->job; mx_log_info(" job=%s(%d):%lu:%lu %s", - group->group.user_name, group->group.user_uid, group->group.group_id, job->job.job_id, - job->job.job_argv_str); + group->user_name, + group->user_uid, + group->group_id, + job->job_id, + job->job_argv_str); } } } - mx_log_info("memory_used=%lu memory_total=%lu", server->memory_used, server->memory_total); - mx_log_info("slots_running=%lu slots=%lu threads_running=%lu jobs_running=%lu", server->slots_running, server->slots, server->threads_running, server->jobs_running); - cpuset_log("cpu set running",&server->cpu_set_running); + mx_log_info("memory_used=%lu memory_total=%lu", + server->memory_used, + server->memory_total); + mx_log_info("slots_running=%lu slots=%lu threads_running=%lu jobs_running=%lu", + server->slots_running, + server->slots, + server->threads_running, + server->jobs_running); + cpuset_log("cpu set running", + &server->cpu_set_running); mx_log_info("====================== SERVER DUMP END ======================"); } -void server_close(struct mxq_server *server) +void server_free(struct mxq_server *server) { - struct mxq_user_list *user, *unext; - struct mxq_group_list *group, *gnext; - struct mxq_job_list *job, *jnext; - - for (user=server->users; user; user=unext) { - for (group=user->groups; group; group=gnext) { - for (job=group->jobs; job; job=jnext) { - jnext = job->next; - mxq_job_free_content(&job->job); - free(job); + struct mxq_user_list *ulist, *unext; + struct mxq_group_list *glist, *gnext; + struct mxq_job_list *jlist, *jnext; + + for (ulist = server->users; ulist; ulist = unext) { + for (glist = ulist->groups; glist; glist = gnext) { + for (jlist = glist->jobs; jlist; jlist = jnext) { + jnext = jlist->next; + mxq_job_free_content(&jlist->job); + mx_free_null(jlist); } - gnext = group->next; - mxq_group_free_content(&group->group); - free(group); + gnext = glist->next; + mxq_group_free_content(&glist->group); + mx_free_null(glist); } - unext = user->next; - free(user); + unext = ulist->next; + mx_free_null(ulist); } + mx_free_null(server->boot_id); + mx_free_null(server->host_id); + mx_free_null(server->finished_jobsdir); + mx_flock_free(server->flock); +} + +void server_close(struct mxq_server *server) +{ if (server->pidfilename) unlink(server->pidfilename); mx_funlock(server->flock); + server->flock = NULL; - mx_free_null(server->boot_id); - mx_free_null(server->host_id); + server_free(server); } int killall(struct mxq_server *server, int sig, unsigned int pgrp) { - struct mxq_user_list *user; - struct mxq_group_list *group; - struct mxq_job_list *job; + struct mxq_user_list *ulist; + struct mxq_group_list *glist; + struct mxq_job_list *jlist; + + struct mxq_group *group; + struct mxq_job *job; + pid_t pid; assert(server); - for (user=server->users; user; user=user->next) { - for (group=user->groups; group; group=group->next) { - for (job=group->jobs; job; job=job->next) { - pid = job->job.host_pid; + for (ulist = server->users; ulist; ulist = ulist->next) { + for (glist = ulist->groups; glist; glist = glist->next) { + group = &glist->group; + + for (jlist = glist->jobs; jlist; jlist = jlist->next) { + job = &jlist->job; + pid = job->host_pid; if (pgrp) pid = -pid; mx_log_info("Sending signal=%d to job=%s(%d):%lu:%lu %s=%d", sig, - group->group.user_name, group->group.user_uid, group->group.group_id, job->job.job_id, + group->user_name, group->user_uid, group->group_id, job->job_id, pgrp?"pgrp":"pid", pid); kill(pid, sig); } @@ -1543,9 +1446,12 @@ int killall(struct mxq_server *server, int sig, unsigned int pgrp) int killall_over_time(struct mxq_server *server) { - struct mxq_user_list *user; - struct mxq_group_list *group; - struct mxq_job_list *job; + struct mxq_user_list *ulist; + struct mxq_group_list *glist; + struct mxq_job_list *jlist; + + struct mxq_group *group; + struct mxq_job *job; struct timeval now; struct timeval delta; @@ -1560,58 +1466,42 @@ int killall_over_time(struct mxq_server *server) /* limit killing to every >= 60 seconds */ mx_within_rate_limit_or_return(60, 1); - mx_log_info("killall_over_time: Sending signals to all jobs running longer than requested."); + mx_log_debug("killall_over_time: Sending signals to all jobs running longer than requested."); gettimeofday(&now, NULL); - for (user=server->users; user; user=user->next) { - for (group=user->groups; group; group=group->next) { - for (job=group->jobs; job; job=job->next) { - timersub(&now, &job->job.stats_starttime, &delta); - - if (delta.tv_sec <= group->group.job_time*60) - continue; - - pid = job->job.host_pid; + for (ulist = server->users; ulist; ulist = ulist->next) { + for (glist = ulist->groups; glist; glist = glist->next) { + group = &glist->group; - if (delta.tv_sec <= group->group.job_time*61) { - mx_log_debug("killall_over_time(): Sending signal=XCPU to job=%s(%d):%lu:%lu pid=%d", - group->group.user_name, group->group.user_uid, group->group.group_id, job->job.job_id, pid); - kill(pid, SIGXCPU); - continue; - } + for (jlist = glist->jobs; jlist; jlist = jlist->next) { + job = &jlist->job; - mx_log_debug("killall_over_time(): Sending signal=XCPU to job=%s(%d):%lu:%lu pgrp=%d", - group->group.user_name, group->group.user_uid, group->group.group_id, job->job.job_id, pid); - kill(-pid, SIGXCPU); + timersub(&now, &job->stats_starttime, &delta); - if (delta.tv_sec <= group->group.job_time*63) + if (delta.tv_sec <= group->job_time*60) continue; - mx_log_info("killall_over_time(): Sending signal=TERM to job=%s(%d):%lu:%lu pid=%d", - group->group.user_name, group->group.user_uid, group->group.group_id, job->job.job_id, pid); - kill(pid, SIGTERM); + pid = job->host_pid; - mx_log_info("killall_over_time(): Sending signal=HUP to job=%s(%d):%lu:%lu pgrp=%d", - group->group.user_name, group->group.user_uid, group->group.group_id, job->job.job_id, pid); - kill(-pid, SIGHUP); + mx_log_info("killall_over_time(): Sending signal=XCPU to job=%s(%d):%lu:%lu pgrp=%d", + group->user_name, group->user_uid, group->group_id, job->job_id, pid); + kill(-pid, SIGCONT); + kill(-pid, SIGXCPU); - if (delta.tv_sec <= group->group.job_time*64) + if (delta.tv_sec <= group->job_time*63) continue; mx_log_info("killall_over_time(): Sending signal=TERM to job=%s(%d):%lu:%lu pgrp=%d", - group->group.user_name, group->group.user_uid, group->group.group_id, job->job.job_id, pid); + group->user_name, group->user_uid, group->group_id, job->job_id, pid); + kill(-pid, SIGCONT); kill(-pid, SIGTERM); - if (delta.tv_sec <= group->group.job_time*66) + if (delta.tv_sec <= group->job_time*66+60*10) continue; - mx_log_info("killall_over_time(): Sending signal=KILL to job=%s(%d):%lu:%lu pid=%d", - group->group.user_name, group->group.user_uid, group->group.group_id, job->job.job_id, pid); - kill(pid, SIGKILL); - mx_log_info("killall_over_time(): Sending signal=KILL to job=%s(%d):%lu:%lu pgrp=%d", - group->group.user_name, group->group.user_uid, group->group.group_id, job->job.job_id, pid); + group->user_name, group->user_uid, group->group_id, job->job_id, pid); kill(-pid, SIGKILL); } } @@ -1685,7 +1575,7 @@ int killall_over_memory(struct mxq_server *server) if (jlist->max_sum_rss/1024 <= group->job_memory) continue; - mx_log_info("killall_over_memory(): used(%lluMiB) > requested(%lluMiB): Sending signal=%d to job=%s(%d):%lu:%lu pid=%d", + mx_log_info("killall_over_memory(): used(%lluMiB) > requested(%lluMiB): Sending signal=%d to job=%s(%d):%lu:%lu pgrp=%d", jlist->max_sum_rss/1024, group->job_memory, signal, @@ -1695,7 +1585,8 @@ int killall_over_memory(struct mxq_server *server) job->job_id, job->host_pid); - kill(job->host_pid, signal); + kill(-job->host_pid, SIGCONT); + kill(-job->host_pid, signal); } } } @@ -1703,49 +1594,393 @@ int killall_over_memory(struct mxq_server *server) return 0; } -int killallcancelled(struct mxq_server *server, int sig, unsigned int pgrp) +int killall_cancelled(struct mxq_server *server) { - struct mxq_user_list *user; - struct mxq_group_list *group; - struct mxq_job_list *job; + struct mxq_user_list *ulist; + struct mxq_group_list *glist; + struct mxq_job_list *jlist; + + struct mxq_group *group; + struct mxq_job *job; + pid_t pid; assert(server); - for (user=server->users; user; user=user->next) { - for (group=user->groups; group; group=group->next) { - if (group->group.group_status != MXQ_GROUP_STATUS_CANCELLED) + for (ulist = server->users; ulist; ulist = ulist->next) { + for (glist = ulist->groups; glist; glist = glist->next) { + group = &glist->group; + + if (group->group_status != MXQ_GROUP_STATUS_CANCELLED) continue; - if (group->jobs) + if (glist->jobs) mx_log_debug("Cancelling all running jobs in group=%s(%d):%lu", - group->group.user_name, group->group.user_uid, group->group.group_id); + group->user_name, group->user_uid, group->group_id); - for (job=group->jobs; job; job=job->next) { - pid = job->job.host_pid; - if (pgrp) - pid = -pid; - mx_log_info(" Sending signal=%d to job=%s(%d):%lu:%lu %s=%d", - sig, - group->group.user_name, group->group.user_uid, group->group.group_id, job->job.job_id, - pgrp?"pgrp":"pid", pid); - kill(pid, sig); + for (jlist = glist->jobs; jlist; jlist = jlist->next) { + job = &jlist->job; + + pid = job->host_pid; + mx_log_info(" Sending signal=TERM to job=%s(%d):%lu:%lu pgrp=%d", + group->user_name, + group->user_uid, + group->group_id, + job->job_id, + pid); + kill(-pid, SIGCONT); + kill(-pid, SIGTERM); } } } return 0; } -int catchall(struct mxq_server *server) { +static void rename_outfiles(struct mxq_group *group, struct mxq_job *job) +{ + int res; + + mxq_job_set_tmpfilenames(group, job); + + if (!mx_streq(job->job_stdout, "/dev/null")) { + res = rename(job->tmp_stdout, job->job_stdout); + if (res == -1) { + mx_log_err(" job=%s(%d):%lu:%lu host_pid=%d :: rename(stdout) failed: %m", + group->user_name, + group->user_uid, + group->group_id, + job->job_id, + job->host_pid); + } + } + + if (!mx_streq(job->job_stderr, "/dev/null") && !mx_streq(job->job_stderr, job->job_stdout)) { + res = rename(job->tmp_stderr, job->job_stderr); + if (res == -1) { + mx_log_err(" job=%s(%d):%lu:%lu host_pid=%d :: rename(stderr) failed: %m", + group->user_name, + group->user_uid, + group->group_id, + job->job_id, + job->host_pid); + } + } +} + +static int job_has_finished(struct mxq_server *server, struct mxq_group *group, struct mxq_job_list *jlist) +{ + int cnt; + struct mxq_job *job; + + job=&jlist->job; + + mxq_set_job_status_exited(server->mysql, job); + + if (job->job_status == MXQ_JOB_STATUS_FINISHED) { + group->group_jobs_finished++; + } else if(job->job_status == MXQ_JOB_STATUS_FAILED) { + group->group_jobs_failed++; + } else if(job->job_status == MXQ_JOB_STATUS_KILLED) { + group->group_jobs_failed++; + } + + rename_outfiles(group, job); + + cnt = jlist->group->slots_per_job; + cpuset_clear_running(&server->cpu_set_running, &job->host_cpu_set); + mxq_job_free_content(job); + free(jlist); + return cnt; +} + +static int job_is_lost(struct mxq_server *server,struct mxq_group *group, struct mxq_job_list *jlist) +{ + int cnt; + struct mxq_job *job; + + assert(jlist->group); + assert(!jlist->next); + + job = &jlist->job; + + mxq_set_job_status_unknown(server->mysql, job); + group->group_jobs_unknown++; + + rename_outfiles(group, job); + + cnt = jlist->group->slots_per_job; + cpuset_clear_running(&server->cpu_set_running, &job->host_cpu_set); + mxq_job_free_content(job); + free(jlist); + return cnt; +} + +static char *fspool_get_filename (struct mxq_server *server,long unsigned int job_id) +{ + char *fspool_filename; + mx_asprintf_forever(&fspool_filename,"%s/%lu.stat",server->finished_jobsdir,job_id); + return fspool_filename; +} + +static int fspool_process_file(struct mxq_server *server,char *filename,int job_id) { + FILE *in; + int res; + + pid_t pid; + int status; + struct rusage rusage; + struct timeval realtime; + + struct mxq_job_list *jlist; + struct mxq_job *job; + struct mxq_group *group; + + in=fopen(filename,"r"); + if (!in) { + return -errno; + } + errno=0; + res=fscanf(in,"1 %d %d %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld %ld", + &pid, + &status, + &realtime.tv_sec, + &realtime.tv_usec, + &rusage.ru_utime.tv_sec, + &rusage.ru_utime.tv_usec, + &rusage.ru_stime.tv_sec, + &rusage.ru_stime.tv_usec, + &rusage.ru_maxrss, + &rusage.ru_ixrss, + &rusage.ru_idrss, + &rusage.ru_isrss, + &rusage.ru_minflt, + &rusage.ru_majflt, + &rusage.ru_nswap, + &rusage.ru_inblock, + &rusage.ru_oublock, + &rusage.ru_msgsnd, + &rusage.ru_msgrcv, + &rusage.ru_nsignals, + &rusage.ru_nvcsw, + &rusage.ru_nivcsw); + fclose(in); + if (res!=22) { + mx_log_err("%s : parse error (res=%d)",filename,res); + if (!errno) + errno=EINVAL; + return -errno; + } + + mx_log_info("job finished (via fspool) : job %d pid %d status %d",job_id,pid,status); + + jlist = server_remove_job_list_by_pid(server, pid); + if (!jlist) { + mx_log_warning("fspool_process_file: %s : job unknown on server", filename); + return -(errno=ENOENT); + } + + job = &jlist->job; + if (job->job_id != job_id) { + mx_log_warning("fspool_process_file: %s: job_id(pid)[%ld] != job_id(filename)[%ld]", + filename, + job->job_id, + job_id); + return -(errno=EINVAL); + } + + assert(jlist->group); + + group = &jlist->group->group; + + job->stats_realtime = realtime; + job->stats_status = status; + job->stats_rusage = rusage; + + job_has_finished(server, group, jlist); + unlink(filename); + return(0); +} + +static int fspool_is_valid_name_parse(const char *name, unsigned long long int *job_id) { + const char *c=name; + if (!*c) + return 0; + if (!isdigit(*c++)) + return 0; + while(isdigit(*c)) { + c++; + } + if (strcmp(c,".stat")) { + return 0; + } + if (job_id) { + *job_id = strtoull(name, NULL, 10); + } + return 1; +} + +static int fspool_is_valid_name(const struct dirent *d) +{ + return fspool_is_valid_name_parse(d->d_name,NULL); +} + +static int fspool_scan(struct mxq_server *server) { + int cnt=0; + int entries; + struct dirent **namelist; + int i; + int res; + unsigned long long int job_id; + char *filename; + + + entries=scandir(server->finished_jobsdir,&namelist,&fspool_is_valid_name,&alphasort); + if (entries<0) { + mx_log_err("scandir %s: %m",server->finished_jobsdir); + return cnt; + } + + for (i=0;ifinished_jobsdir,namelist[i]->d_name); + fspool_is_valid_name_parse(namelist[i]->d_name,&job_id); + res=fspool_process_file(server,filename,job_id); + if (res==0) { + cnt++; + } + free(namelist[i]); + free(filename); + } + + free(namelist); + return cnt; +} + +static int file_exists(char *name) { + int res; + struct stat stat_buf; + + res=stat(name,&stat_buf); + if (res<0) { + if (errno==ENOENT) { + return 0; + } else { + mx_log_warning("%s: %m",name); + return 1; + } + } else { + return 1; + } +} + +static int fspool_file_exists(struct mxq_server *server,uint64_t job_id) { + _mx_cleanup_free_ char *fspool_filename=NULL; + fspool_filename=fspool_get_filename(server,job_id); + return file_exists(fspool_filename); +} + +static int lost_scan_one(struct mxq_server *server) +{ + struct mxq_user_list *ulist; + struct mxq_group_list *glist; + struct mxq_job_list *jlist; + + struct mxq_job *job; + + int res; + + for (ulist = server->users; ulist; ulist = ulist->next) { + for (glist = ulist->groups; glist; glist = glist->next) { + for (jlist = glist->jobs; jlist; jlist = jlist->next) { + job = &jlist->job; + res = kill(job->host_pid, 0); + if (res >= 0) + continue; + + /* PID is not */ + + if (errno != ESRCH) + return -errno; + + if (!fspool_file_exists(server, job->job_id)) { + mx_log_warning("pid %u: process is gone. cancel job %d", + jlist->job.host_pid, + jlist->job.job_id); + server_remove_job_list_by_pid(server, job->host_pid); + + job->job_status = MXQ_JOB_STATUS_UNKNOWN; + + job_is_lost(server, &glist->group, jlist); + return 1; + } + } + } + } + return 0; +} + +static int lost_scan(struct mxq_server *server) +{ + int res; + int count=0; + do { + res=lost_scan_one(server); + if (res<0) + return res; + count+=res; + } while (res>0); + return count; +} + + +static int load_running_jobs(struct mxq_server *server) +{ + _mx_cleanup_free_ struct mxq_job *jobs = NULL; + + struct mxq_job_list *jlist; + struct mxq_group_list *glist; + + struct mxq_job *job; + + int job_cnt; + + int j; + + job_cnt = mxq_load_jobs_running_on_server(server->mysql, &jobs, server->hostname, server->server_id); + if (job_cnt < 0) + return job_cnt; + + for (j=0; j < job_cnt; j++) { + job = &jobs[j]; + + job->stats_starttime.tv_sec = job->date_start; + + jlist = server_get_job_list_by_job_id(server, job->job_id); + if (jlist) + continue; + + glist = server_get_group_list_by_group_id(server, job->group_id); + if (!glist) { + mx_log_fatal("BUG17: group %lu of job %lu not loaded. skipping job.", + job->group_id, job->job_id); + return -(errno=EUCLEAN); + } else { + group_list_add_job(glist, job); + } + } + return job_cnt; +} + +int catchall(struct mxq_server *server) +{ + struct mxq_job_list *jlist; + struct mxq_job *job; + struct mxq_group *group; struct rusage rusage; struct timeval now; int status; pid_t pid; int cnt = 0; - struct mxq_job_list *job; - struct mxq_job *j; - struct mxq_group *g; int res; while (1) { @@ -1768,22 +2003,34 @@ int catchall(struct mxq_server *server) { assert(siginfo.si_pid > 1); - job = server_remove_job_by_pid(server, siginfo.si_pid); - if (!job) { + jlist = server_get_job_list_by_pid(server, siginfo.si_pid); + if (!jlist) { mx_log_warning("unknown pid returned.. si_pid=%d si_uid=%d si_code=%d si_status=%d getpgid(si_pid)=%d getsid(si_pid)=%d", - siginfo.si_pid, siginfo.si_uid, siginfo.si_code, siginfo.si_status, - getpgid(siginfo.si_pid), getsid(siginfo.si_pid)); - pid = waitpid(siginfo.si_pid, &status, WNOHANG); + siginfo.si_pid, + siginfo.si_uid, + siginfo.si_code, + siginfo.si_status, + getpgid(siginfo.si_pid), + getsid(siginfo.si_pid)); + /* collect child, ignore status */ + pid = waitpid(siginfo.si_pid, NULL, WNOHANG); if (pid != siginfo.si_pid) mx_log_err("FIX ME BUG!!! pid=%d errno=%d (%m)", pid, errno); continue; } - /* valid job returned.. */ - /* kill possible leftovers with SIGKILL */ - res = kill(-siginfo.si_pid, SIGKILL); - if (res == -1) - mx_log_err("kill process group pgrp=%d failed: %m", -siginfo.si_pid); + assert(jlist); + assert(jlist->group); + + job = &jlist->job; + group = &jlist->group->group; + + if (fspool_file_exists(server, job->job_id)) { + waitpid(siginfo.si_pid, &status, WNOHANG); + continue; + } + mx_log_err("reaper died. status=%d. Cleaning up job from catchall.",status); + job_list_remove_self(jlist); /* reap child and save new state */ pid = wait4(siginfo.si_pid, &status, WNOHANG, &rusage); @@ -1802,111 +2049,107 @@ int catchall(struct mxq_server *server) { gettimeofday(&now, NULL); - j = &job->job; - assert(job->group); - g = &job->group->group; - timersub(&now, &j->stats_starttime, &j->stats_realtime); - j->stats_max_sumrss = job->max_sum_rss; - j->stats_status = status; - j->stats_rusage = rusage; + timersub(&now, &job->stats_starttime, &job->stats_realtime); + job->stats_max_sumrss = jlist->max_sum_rss; + job->stats_status = status; + job->stats_rusage = rusage; mx_log_info(" job=%s(%d):%lu:%lu host_pid=%d stats_status=%d :: child process returned.", - g->user_name, g->user_uid, g->group_id, j->job_id, pid, status); - - mxq_set_job_status_exited(server->mysql, j); - - if (j->job_status == MXQ_JOB_STATUS_FINISHED) { - g->group_jobs_finished++; - } else if(j->job_status == MXQ_JOB_STATUS_FAILED) { - g->group_jobs_failed++; - } else if(j->job_status == MXQ_JOB_STATUS_KILLED) { - g->group_jobs_failed++; - } - - mxq_job_set_tmpfilenames(g, j); - - if (!mx_streq(j->job_stdout, "/dev/null")) { - res = rename(j->tmp_stdout, j->job_stdout); - if (res == -1) { - mx_log_err(" job=%s(%d):%lu:%lu host_pid=%d :: rename(stdout) failed: %m", - g->user_name, g->user_uid, g->group_id, j->job_id, pid); - } - } - - if (!mx_streq(j->job_stderr, "/dev/null") && !mx_streq(j->job_stderr, j->job_stdout)) { - res = rename(j->tmp_stderr, j->job_stderr); - if (res == -1) { - mx_log_err(" job=%s(%d):%lu:%lu host_pid=%d :: rename(stderr) failed: %m", - g->user_name, g->user_uid, g->group_id, j->job_id, pid); - } - } + group->user_name, + group->user_uid, + group->group_id, + job->job_id, + pid, + status); - cnt += job->group->slots_per_job; - cpuset_clear_running(&server->cpu_set_running,&j->host_cpu_set); - mxq_job_free_content(j); - free(job); + cnt += job_has_finished(server, group, jlist); } return cnt; } -int load_groups(struct mxq_server *server) { - struct mxq_group *mxqgroups = NULL; - struct mxq_group_list *group; - int group_cnt; +int load_running_groups(struct mxq_server *server) +{ + struct mxq_group_list *glist; + struct mxq_group *grps; + struct mxq_group *group; + + int grp_cnt; int total; int i; + assert(server); + + grps = NULL; + if (RUNNING_AS_ROOT) - group_cnt = mxq_load_running_groups(server->mysql, &mxqgroups); + grp_cnt = mxq_load_running_groups(server->mysql, &grps); else - group_cnt = mxq_load_running_groups_for_user(server->mysql, &mxqgroups, getuid()); + grp_cnt = mxq_load_running_groups_for_user(server->mysql, &grps, getuid()); - for (i=0, total=0; imysql); assert(server->hostname); assert(server->server_id); - res1 = mxq_unassign_jobs_of_server(server->mysql, server->hostname, server->server_id); - if (res1 < 0) { + res = mxq_unassign_jobs_of_server(server->mysql, server->hostname, server->server_id); + if (res < 0) { mx_log_info("mxq_unassign_jobs_of_server() failed: %m"); - return res1; + return res; } - if (res1 > 0) + if (res > 0) mx_log_info("hostname=%s server_id=%s :: recovered from previous crash: unassigned %d jobs.", - server->hostname, server->server_id, res1); + server->hostname, server->server_id, res); - res2 = mxq_set_job_status_unknown_for_server(server->mysql, server->hostname, server->server_id); - if (res2 < 0) { - mx_log_info("mxq_unassign_jobs_of_server() failed: %m"); - return res2; + res = load_running_groups(server); + mx_log_info("recover: %d running groups loaded.", res); + + res = load_running_jobs(server); + if (res < 0) { + mx_log_err("recover: load_running_jobs: %m"); + return res; + } + if (res > 0) + mx_log_info("recover: reload %d running jobs from database", res); + + res=fspool_scan(server); + if (res<0) { + mx_log_err("recover: server_fspool_scan: %m"); + return res; } - if (res2 > 0) - mx_log_info("hostname=%s server_id=%s :: recovered from previous crash: set job_status='unknown' for %d jobs.", - server->hostname, server->server_id, res2); + if (res>0) + mx_log_info("recover: processed %d finished jobs from fspool",res); - return res1+res2; + res=lost_scan(server); + if (res<0) { + mx_log_err("recover: lost_scan: %m"); + return(res); + } + if (res>0) + mx_log_warning("recover: %d jobs vanished from the system",res); + + return 0; } /**********************************************************************/ @@ -1923,6 +2166,11 @@ static void sig_handler(int sig) global_sigterm_cnt++; return; } + + if (sig == SIGQUIT) { + global_sigquit_cnt++; + return; + } } int main(int argc, char *argv[]) @@ -1968,7 +2216,7 @@ int main(int argc, char *argv[]) signal(SIGINT, sig_handler); signal(SIGTERM, sig_handler); - signal(SIGQUIT, SIG_IGN); + signal(SIGQUIT, sig_handler); signal(SIGHUP, SIG_IGN); signal(SIGTSTP, SIG_IGN); signal(SIGTTIN, SIG_IGN); @@ -1980,14 +2228,14 @@ int main(int argc, char *argv[]) mx_log_warning("recover_from_previous_crash() failed. Aborting execution."); fail = 1; } - if (res > 0) - mx_log_warning("total %d jobs recovered from previous crash.", res); if (server.recoveronly) fail = 1; - while (!global_sigint_cnt && !global_sigterm_cnt && !fail) { + while (!global_sigint_cnt && !global_sigterm_cnt && !global_sigquit_cnt && !fail) { slots_returned = catchall(&server); + slots_returned += fspool_scan(&server); + if (slots_returned) mx_log_info("slots_returned=%lu :: Main Loop freed %lu slots.", slots_returned, slots_returned); @@ -1996,12 +2244,11 @@ int main(int argc, char *argv[]) slots_started = 0; } - group_cnt = load_groups(&server); + group_cnt = load_running_groups(&server); if (group_cnt) mx_log_debug("group_cnt=%d :: %d Groups loaded", group_cnt, group_cnt); - killallcancelled(&server, SIGTERM, 0); - killallcancelled(&server, SIGINT, 0); + killall_cancelled(&server); killall_over_time(&server); killall_over_memory(&server); @@ -2036,26 +2283,28 @@ int main(int argc, char *argv[]) } /*** clean up ***/ - mx_log_info("global_sigint_cnt=%d global_sigterm_cnt=%d : Exiting.", global_sigint_cnt, global_sigterm_cnt); + mx_log_info("global_sigint_cnt=%d global_sigterm_cnt=%d global_sigquit_cnt=%d: Exiting.", global_sigint_cnt, global_sigterm_cnt,global_sigquit_cnt); - while (server.jobs_running) { - slots_returned = catchall(&server); - if (slots_returned) { - mx_log_info("jobs_running=%lu slots_returned=%lu global_sigint_cnt=%d global_sigterm_cnt=%d :", - server.jobs_running, slots_returned, global_sigint_cnt, global_sigterm_cnt); - continue; - } - if (global_sigint_cnt) - killall(&server, SIGTERM, 0); - - killallcancelled(&server, SIGTERM, 0); - killallcancelled(&server, SIGINT, 0); - killall_over_time(&server); - killall_over_memory(&server); + if (global_sigterm_cnt||global_sigint_cnt) { + while (server.jobs_running) { + slots_returned = catchall(&server); + slots_returned += fspool_scan(&server); - mx_log_info("jobs_running=%lu global_sigint_cnt=%d global_sigterm_cnt=%d : Exiting. Wating for jobs to finish. Sleeping for a while.", - server.jobs_running, global_sigint_cnt, global_sigterm_cnt); - sleep(1); + if (slots_returned) { + mx_log_info("jobs_running=%lu slots_returned=%lu global_sigint_cnt=%d global_sigterm_cnt=%d :", + server.jobs_running, slots_returned, global_sigint_cnt, global_sigterm_cnt); + continue; + } + if (global_sigint_cnt) + killall(&server, SIGTERM, 1); + + killall_cancelled(&server); + killall_over_time(&server); + killall_over_memory(&server); + mx_log_info("jobs_running=%lu global_sigint_cnt=%d global_sigterm_cnt=%d : Exiting. Wating for jobs to finish. Sleeping for a while.", + server.jobs_running, global_sigint_cnt, global_sigterm_cnt); + sleep(1); + } } mx_mysql_finish(&(server.mysql)); diff --git a/mxqd.h b/mxqd.h index 4bc1cd0a..8b115a99 100644 --- a/mxqd.h +++ b/mxqd.h @@ -2,6 +2,10 @@ #define __MXQ_SERVER_H__ 1 #include "mx_mysql.h" + +#include "mxq_job.h" +#include "mxq_group.h" + #include struct mxq_job_list { @@ -11,8 +15,6 @@ struct mxq_job_list { struct mxq_job job; unsigned long long int max_sum_rss; - - pid_t pid; }; struct mxq_group_list { @@ -81,6 +83,7 @@ struct mxq_server { char *hostname; char *server_id; char *pidfilename; + char *finished_jobsdir; struct mx_flock *flock; char *initial_path; diff --git a/mxqd_control.c b/mxqd_control.c new file mode 100644 index 00000000..9055f434 --- /dev/null +++ b/mxqd_control.c @@ -0,0 +1,385 @@ +#define _GNU_SOURCE + +#include +#include +#include + +#include "mxq_job.h" +#include "mxq_group.h" + +#include "mxqd.h" + +static void _group_list_init(struct mxq_group_list *glist) +{ + struct mxq_server *server; + struct mxq_group *group; + + long double memory_threads; + long double memory_per_thread; + long double memory_max_available; + + unsigned long slots_per_job; + unsigned long jobs_max; + unsigned long slots_max; + unsigned long memory_max; + + assert(glist); + assert(glist->user); + assert(glist->user->server); + + server = glist->user->server; + group = &glist->group; + + memory_per_thread = (long double)group->job_memory / (long double)group->job_threads; + memory_max_available = (long double)server->memory_total * (long double)server->memory_max_per_slot / memory_per_thread; + + if (memory_max_available > server->memory_total) + memory_max_available = server->memory_total; + + slots_per_job = ceill((long double)group->job_memory / server->memory_avg_per_slot); + + if (slots_per_job < group->job_threads) + slots_per_job = group->job_threads; + + memory_threads = memory_max_available / memory_per_thread; + + if (memory_per_thread > server->memory_max_per_slot) { + jobs_max = memory_threads + 0.5; + } else if (memory_per_thread > server->memory_avg_per_slot) { + jobs_max = memory_threads + 0.5; + } else { + jobs_max = server->slots; + } + jobs_max /= group->job_threads; + + /* limit maximum number of jobs on user/group request */ + if (group->job_max_per_node && jobs_max > group->job_max_per_node) + jobs_max = group->job_max_per_node; + + slots_max = jobs_max * slots_per_job; + memory_max = jobs_max * group->job_memory; + + if (glist->memory_per_thread != memory_per_thread + || glist->memory_max_available != memory_max_available + || glist->memory_max_available != memory_max_available + || glist->slots_per_job != slots_per_job + || glist->jobs_max != jobs_max + || glist->slots_max != slots_max + || glist->memory_max != memory_max) { + mx_log_info(" group=%s(%u):%lu jobs_max=%lu slots_max=%lu memory_max=%lu slots_per_job=%lu :: group %sinitialized.", + group->user_name, + group->user_uid, + group->group_id, + jobs_max, + slots_max, + memory_max, + slots_per_job, + glist->orphaned ? "re" : ""); + } + + glist->memory_per_thread = memory_per_thread; + glist->memory_max_available = memory_max_available; + + glist->slots_per_job = slots_per_job; + + glist->jobs_max = jobs_max; + glist->slots_max = slots_max; + glist->memory_max = memory_max; + + glist->orphaned = 0; +} + +struct mxq_group_list *server_get_group_list_by_group_id(struct mxq_server *server, uint64_t group_id) +{ + struct mxq_user_list *ulist; + struct mxq_group_list *glist; + + struct mxq_group *group; + + for (ulist = server->users; ulist; ulist = ulist->next) { + for (glist = ulist->groups; glist; glist = glist->next) { + group = &glist->group; + if (group->group_id == group_id) + return glist; + } + } + return NULL; +} + +struct mxq_job_list *server_get_job_list_by_job_id(struct mxq_server *server, uint64_t job_id) +{ + struct mxq_user_list *ulist; + struct mxq_group_list *glist; + struct mxq_job_list *jlist; + + struct mxq_job *job; + + for (ulist = server->users; ulist; ulist = ulist->next) { + for (glist = ulist->groups; glist; glist = glist->next) { + for (jlist = glist->jobs; jlist; jlist = jlist->next) { + job = &jlist->job; + if (job->job_id == job_id) + return jlist; + } + } + } + return NULL; +} + +struct mxq_job_list *server_get_job_list_by_pid(struct mxq_server *server, pid_t pid) +{ + struct mxq_user_list *ulist; + struct mxq_group_list *glist; + struct mxq_job_list *jlist; + + struct mxq_job *job; + + assert(server); + + for (ulist = server->users; ulist; ulist = ulist->next) { + for (glist = ulist->groups; glist; glist = glist->next) { + for (jlist = glist->jobs; jlist; jlist = jlist->next) { + job = &jlist->job; + if (job->host_pid == pid) + return jlist; + } + } + } + return NULL; +} + +void job_list_remove_self(struct mxq_job_list *jlist) +{ + struct mxq_group_list *glist; + struct mxq_user_list *ulist; + struct mxq_server *server; + + struct mxq_job_list **jprevp; + + struct mxq_job *job; + struct mxq_group *group; + + assert(jlist); + assert(jlist->group); + assert(jlist->group->user); + assert(jlist->group->user->server); + + glist = jlist->group; + ulist = glist->user; + server = ulist->server; + + group = &glist->group; + job = &jlist->job; + + for (jprevp = &glist->jobs; *jprevp; jprevp = &(*jprevp)->next) { + if (*jprevp != jlist) + continue; + + *jprevp = jlist->next; + + glist->job_cnt--; + ulist->job_cnt--; + server->job_cnt--; + + glist->slots_running -= job->host_slots; + ulist->slots_running -= job->host_slots; + server->slots_running -= job->host_slots; + + glist->threads_running -= group->job_threads; + ulist->threads_running -= group->job_threads; + server->threads_running -= group->job_threads; + + group->group_jobs_running--; + + glist->jobs_running--; + ulist->jobs_running--; + server->jobs_running--; + + glist->memory_used -= group->job_memory; + ulist->memory_used -= group->job_memory; + server->memory_used -= group->job_memory; + break; + } +} + +struct mxq_job_list *server_remove_job_list_by_pid(struct mxq_server *server, pid_t pid) +{ + struct mxq_job_list *jlist; + + assert(server); + + jlist = server_get_job_list_by_pid(server, pid); + if (jlist) { + job_list_remove_self(jlist); + } + return jlist; +} + +static struct mxq_user_list *_user_list_find_by_uid(struct mxq_user_list *ulist, uint32_t uid) +{ + for (; ulist; ulist = ulist->next) { + assert(ulist->groups); + + if (ulist->groups[0].group.user_uid == uid) { + return ulist; + } + } + return NULL; +} + +struct mxq_user_list *server_find_user_by_uid(struct mxq_server *server, uint32_t uid) +{ + assert(server); + + return _user_list_find_by_uid(server->users, uid); +} + +struct mxq_group_list *_group_list_find_by_group(struct mxq_group_list *glist, struct mxq_group *group) +{ + assert(group); + + for (; glist; glist = glist->next) { + if (glist->group.group_id == group->group_id) { + return glist; + } + } + return NULL; +} + +struct mxq_job_list *group_list_add_job(struct mxq_group_list *glist, struct mxq_job *job) +{ + struct mxq_server *server; + + struct mxq_job_list *jlist; + struct mxq_user_list *ulist; + + struct mxq_group *group; + + assert(glist); + assert(glist->user); + assert(glist->user->server); + assert(job->job_status == MXQ_JOB_STATUS_RUNNING); + + group = &glist->group; + ulist = glist->user; + server = ulist->server; + + jlist = mx_calloc_forever(1, sizeof(*jlist)); + + memcpy(&jlist->job, job, sizeof(*job)); + + jlist->group = glist; + + jlist->next = glist->jobs; + glist->jobs = jlist; + + glist->job_cnt++; + ulist->job_cnt++; + server->job_cnt++; + + glist->slots_running += glist->slots_per_job; + ulist->slots_running += glist->slots_per_job; + server->slots_running += glist->slots_per_job; + + glist->threads_running += group->job_threads; + ulist->threads_running += group->job_threads; + server->threads_running += group->job_threads; + + CPU_OR(&server->cpu_set_running, &server->cpu_set_running, &job->host_cpu_set); + + group->group_jobs_running++; + group->group_jobs_inq--; + + glist->jobs_running++; + ulist->jobs_running++; + server->jobs_running++; + + glist->memory_used += group->job_memory; + ulist->memory_used += group->job_memory; + server->memory_used += group->job_memory; + + return jlist; +} + +struct mxq_group_list *_user_list_add_group(struct mxq_user_list *ulist, struct mxq_group *group) +{ + struct mxq_group_list *glist; + struct mxq_server *server; + + assert(ulist); + assert(ulist->server); + + server = ulist->server; + + glist = mx_calloc_forever(1, sizeof(*glist)); + + memcpy(&glist->group, group, sizeof(*group)); + + glist->user = ulist; + + glist->next = ulist->groups; + ulist->groups = glist; + + ulist->group_cnt++; + server->group_cnt++; + + _group_list_init(glist); + + return glist; +} + +struct mxq_group_list *_server_add_group(struct mxq_server *server, struct mxq_group *group) +{ + struct mxq_user_list *ulist; + struct mxq_group_list *glist; + + assert(server); + assert(group); + + ulist = mx_calloc_forever(1, sizeof(*ulist)); + + ulist->server = server; + + ulist->next = server->users; + server->users = ulist; + + server->user_cnt++; + + glist = _user_list_add_group(ulist, group); + assert(glist); + + return glist; +} + +static struct mxq_group_list *_user_list_update_group(struct mxq_user_list *ulist, struct mxq_group *group) +{ + struct mxq_group_list *glist; + + assert(ulist); + assert(group); + + glist = _group_list_find_by_group(ulist->groups, group); + if (!glist) { + return _user_list_add_group(ulist, group); + } + + mxq_group_free_content(&glist->group); + + memcpy(&glist->group, group, sizeof(*group)); + + _group_list_init(glist); + + return glist; +} + +struct mxq_group_list *server_update_group(struct mxq_server *server, struct mxq_group *group) +{ + struct mxq_user_list *ulist; + + ulist = _user_list_find_by_uid(server->users, group->user_uid); + if (!ulist) { + return _server_add_group(server, group); + } + + return _user_list_update_group(ulist, group); +} diff --git a/mxqd_control.h b/mxqd_control.h new file mode 100644 index 00000000..3a8d69e9 --- /dev/null +++ b/mxqd_control.h @@ -0,0 +1,29 @@ +#ifndef _MXQD_CONTROL_H +#define _MXQD_CONTROL_H + +#include "mxqd.h" +#include "mxq_group.h" +#include "mxq_job.h" + +void job_list_remove_self(struct mxq_job_list *jlist); +struct mxq_job_list *server_remove_job_list_by_pid(struct mxq_server *server, pid_t pid); +struct mxq_user_list *server_find_user_by_uid(struct mxq_server *server, uint32_t uid); +struct mxq_group_list *_group_list_find_by_group(struct mxq_group_list *glist, struct mxq_group *group); +struct mxq_job_list *group_list_add_job(struct mxq_group_list *glist, struct mxq_job *job); +struct mxq_group_list *server_update_group(struct mxq_server *server, struct mxq_group *group); + +struct mxq_group_list *server_get_group_list_by_group_id(struct mxq_server *server, uint64_t group_id); +struct mxq_job_list *server_get_job_list_by_job_id(struct mxq_server *server, uint64_t job_id); +struct mxq_job_list *server_get_job_list_by_pid(struct mxq_server *server, pid_t pid); + +/* +static void _group_list_init(struct mxq_group_list *glist) +static struct mxq_user_list *_user_list_find_by_uid(struct mxq_user_list *ulist, uint32_t uid) + +static struct mxq_group_list *_user_list_update_group(struct mxq_user_list *ulist, struct mxq_group *group) +*/ + +struct mxq_group_list *_server_add_group(struct mxq_server *server, struct mxq_group *group); +struct mxq_group_list *_user_list_add_group(struct mxq_user_list *ulist, struct mxq_group *group); + +#endif diff --git a/mysql/alter_tables_0.18.2.sql b/mysql/alter_tables_0.18.2.sql new file mode 100644 index 00000000..1be5d73d --- /dev/null +++ b/mysql/alter_tables_0.18.2.sql @@ -0,0 +1,5 @@ +ALTER TABLE mxq_job + ADD COLUMN + host_cpu_set VARCHAR(4095) NOT NULL DEFAULT "" + AFTER + host_slots; diff --git a/mysql/create_tables.sql b/mysql/create_tables.sql index f2566cc6..f589f10a 100644 --- a/mysql/create_tables.sql +++ b/mysql/create_tables.sql @@ -87,6 +87,7 @@ CREATE TABLE IF NOT EXISTS mxq_job ( host_hostname VARCHAR(64) NOT NULL DEFAULT "", host_pid INT4 UNSIGNED NOT NULL DEFAULT 0, host_slots INT4 UNSIGNED NOT NULL DEFAULT 0, + host_cpu_set VARCHAR(4095) NOT NULL DEFAULT "", date_submit TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, date_start TIMESTAMP NOT NULL DEFAULT 0, diff --git a/test_mxqd_control.c b/test_mxqd_control.c new file mode 100644 index 00000000..ef115661 --- /dev/null +++ b/test_mxqd_control.c @@ -0,0 +1,37 @@ + +#define _GNU_SOURCE + +#include + +#include "mxqd.h" + +#define MEMORY_TOTAL 20480 +#define MEMORY_MAX_PER_SLOT 2048 +#define SLOTS 10 + +void __init_server(struct mxq_server *server) +{ + server->memory_total = MEMORY_TOTAL; + server->memory_max_per_slot = MEMORY_MAX_PER_SLOT; + server->slots = SLOTS; + server->memory_avg_per_slot = MEMORY_TOTAL / SLOTS; +} + + +static void test_mxqd_control(void) +{ + struct mxq_server _server = { 0 }; + struct mxq_server *server; + + server = &_server; + + __init_server(server); + + assert(1); +} + +int main(int argc, char *argv[]) +{ + test_mxqd_control(); + return 0; +} diff --git a/web/pages/mxq/mxq.in b/web/pages/mxq/mxq.in index 422c14b4..0db5c219 100755 --- a/web/pages/mxq/mxq.in +++ b/web/pages/mxq/mxq.in @@ -356,6 +356,7 @@ host_id : $o{host_id} host_hostname : $o{host_hostname} host_pid : $o{host_pid} host_slots : $o{host_slots} +host_cpu_set : $o{host_cpu_set} date_submit : $o{date_submit} date_start : $o{date_start} $ago