diff --git a/Makefile b/Makefile index d4c993ba..488da488 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ MXQ_VERSION_MAJOR = 0 MXQ_VERSION_MINOR = 30 -MXQ_VERSION_PATCH = 2 +MXQ_VERSION_PATCH = 3 MXQ_VERSION_EXTRA = "beta" MXQ_VERSIONDATE = 2022 @@ -121,6 +121,7 @@ CFLAGS_MYSQL += ${CFLAGS_MXQ_MYSQL_DEFAULT_GROUP} CFLAGS_MYSQL += -DMX_MYSQL_FAIL_WAIT_DEFAULT=5 CFLAGS += -g +CFLAGS += -O3 CFLAGS += -Wall CFLAGS += -DMXQ_VERSION=\"${MXQ_VERSION}\" CFLAGS += -DMXQ_VERSIONFULL=\"${MXQ_VERSIONFULL}\" diff --git a/mx_util.c b/mx_util.c index 8b675159..66a51f28 100644 --- a/mx_util.c +++ b/mx_util.c @@ -356,7 +356,7 @@ int mx_strtoll(char *str, signed long long int *to) int mx_strtoui(char *str, unsigned int *to) { - unsigned long int ul; + unsigned long int ul = 0; /* avoid false maybe-uninitialized warning */ int res; assert(str); @@ -376,7 +376,7 @@ int mx_strtoui(char *str, unsigned int *to) int mx_strtou8(char *str, uint8_t *to) { - unsigned long int ul; + unsigned long int ul = 0; /* avoid false maybe-uninitialized warning */ int res; assert(str); @@ -396,7 +396,7 @@ int mx_strtou8(char *str, uint8_t *to) int mx_strtou16(char *str, uint16_t *to) { - unsigned long int ul; + unsigned long int ul = 0; /* avoid false maybe-uninitialized warning */ int res; assert(str); @@ -416,7 +416,7 @@ int mx_strtou16(char *str, uint16_t *to) int mx_strtou32(char *str, uint32_t *to) { - unsigned long int ul; + unsigned long int ul = 0; /* avoid false maybe-uninitialized warning */ int res; assert(str); @@ -436,7 +436,7 @@ int mx_strtou32(char *str, uint32_t *to) int mx_strtou64(char *str, uint64_t *to) { - unsigned long long int ull; + unsigned long long int ull = 0; /* avoid false maybe-uninitialized warning */; int res; assert(str); @@ -458,7 +458,7 @@ int mx_strtou64(char *str, uint64_t *to) int mx_strtoi(char *str, signed int *to) { - signed long int l; + signed long int l = 0; /* avoid false maybe-uninitialized warning */ int res; assert(str); @@ -478,7 +478,7 @@ int mx_strtoi(char *str, signed int *to) int mx_strtoi8(char *str, int8_t *to) { - signed long int l; + signed long int l = 0; /* avoid false maybe-uninitialized warning */ int res; assert(str); @@ -498,7 +498,7 @@ int mx_strtoi8(char *str, int8_t *to) int mx_strtoi16(char *str, int16_t *to) { - signed long int l; + signed long int l = 0; /* avoid false maybe-uninitialized warning */ int res; assert(str); @@ -518,7 +518,7 @@ int mx_strtoi16(char *str, int16_t *to) int mx_strtoi32(char *str, int32_t *to) { - signed long int l; + signed long int l = 0; /* avoid false maybe-uninitialized warning */ int res; assert(str); @@ -538,7 +538,7 @@ int mx_strtoi32(char *str, int32_t *to) int mx_strtoi64(char *str, int64_t *to) { - signed long long int ll; + signed long long int ll = 0; /* avoid false maybe-uninitialized warning */ int res; assert(str); @@ -797,7 +797,7 @@ int mx_read_first_line_from_file(char *fname, char **line) int mx_strscan_ull(char **str, unsigned long long int *to) { - unsigned long long int l; + unsigned long long int l = 0; /* avoid false maybe-uninitialized warning */; char *s; char *p; char o = 0; @@ -828,7 +828,7 @@ int mx_strscan_ull(char **str, unsigned long long int *to) int mx_strscan_ll(char **str, long long int *to) { - long long int l; + long long int l = 0; /* avoid false maybe-uninitialized warning */; char *s; char *p; char o = 0; @@ -1268,6 +1268,7 @@ int mx_daemon(int nochdir, int noclose) return daemon(nochdir, noclose); } +/* guarantee stable sort */ void _mx_sort_linked_list (void **list, int (*cmp)(void *o1,void *o2), void ** getnextptr(void *o)) { void *unsorted=*list; diff --git a/mxqd.c b/mxqd.c index 32be11c0..a4c35a61 100644 --- a/mxqd.c +++ b/mxqd.c @@ -64,8 +64,8 @@ static int global_sigrestart_cnt=0; static sigset_t all_signals; -int mxq_redirect_output(char *stdout_fname, char *stderr_fname); -void server_free(struct mxq_server *server); +static int mxq_redirect_output(char *stdout_fname, char *stderr_fname); +static void server_free(struct mxq_server *server); static void print_usage(void) { @@ -157,7 +157,7 @@ static void cpuset_clear_running(cpu_set_t *running,cpu_set_t *job) { } /**********************************************************************/ -int setup_cronolog(char *cronolog, char *logdir, char *rellink, char *relformat) +static int setup_cronolog(char *cronolog, char *logdir, char *rellink, char *relformat) { int res; int pipe_fd[2]; @@ -221,7 +221,7 @@ int setup_cronolog(char *cronolog, char *logdir, char *rellink, char *relformat) } -int setup_stdin(char *fname) +static int setup_stdin(char *fname) { int fh; int res; @@ -247,7 +247,7 @@ int setup_stdin(char *fname) return 1; } -int write_pid_to_file(char *fname) +static int write_pid_to_file(char *fname) { int fd; int res; @@ -265,7 +265,7 @@ int write_pid_to_file(char *fname) return 0; } -int server_update_daemon_statistics(struct mxq_server *server) +static int server_update_daemon_statistics(struct mxq_server *server) { struct mxq_daemon *daemon; @@ -400,7 +400,7 @@ static int expression_is_valid(char *expr) { return 1; } -int server_init(struct mxq_server *server, int argc, char *argv[]) +static int server_init(struct mxq_server *server, int argc, char *argv[]) { assert(server); @@ -1007,7 +1007,7 @@ static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job) /**********************************************************************/ -int mxq_redirect_open(char *fname) +static int mxq_redirect_open(char *fname) { int fh; int res; @@ -1036,7 +1036,7 @@ int mxq_redirect_open(char *fname) } -int mxq_redirect(char *fname, int fd) +static int mxq_redirect(char *fname, int fd) { int fh; int res; @@ -1052,7 +1052,7 @@ int mxq_redirect(char *fname, int fd) return 0; } -int mxq_redirect_output(char *stdout_fname, char *stderr_fname) +static int mxq_redirect_output(char *stdout_fname, char *stderr_fname) { int res; @@ -1077,7 +1077,7 @@ int mxq_redirect_output(char *stdout_fname, char *stderr_fname) return 0; } -int mxq_redirect_input(char *stdin_fname) +static int mxq_redirect_input(char *stdin_fname) { int fh; int res; @@ -1096,7 +1096,7 @@ int mxq_redirect_input(char *stdin_fname) return 1; } -int user_process(struct mxq_group_list *glist, struct mxq_job *job) +static int user_process(struct mxq_group_list *glist, struct mxq_job *job) { int res; char **argv; @@ -1166,10 +1166,10 @@ static int is_reaper(pid_t pid) { return 0; } -int reaper_process(struct mxq_server *server,struct mxq_group_list *glist, struct mxq_job *job) { +static int reaper_process(struct mxq_server *server,struct mxq_group_list *glist, struct mxq_job *job) { pid_t pid; struct rusage rusage; - int status; + int status = 0; pid_t waited_pid; int waited_status; struct timeval now; @@ -1282,7 +1282,7 @@ int reaper_process(struct mxq_server *server,struct mxq_group_list *glist, struc return(0); } -unsigned long start_job(struct mxq_group_list *glist) +static unsigned long start_job(struct mxq_group_list *glist) { struct mxq_server *server; struct mxq_job_list *jlist; @@ -1411,14 +1411,37 @@ unsigned long start_job(struct mxq_group_list *glist) return 1; } -unsigned long start_user(struct mxq_user_list *ulist, long slots_to_start) +static int can_start_job(struct mxq_group_list *group, unsigned long df_scratch, struct mxq_server *server, long slots_to_start) { + /* Can we start a(nother) job from this group */ + if (group->jobs_running >= group->group.group_jobs) + return 0; + if (group->jobs_running >= group->jobs_max) + return 0; + if (mxq_group_jobs_inq(&group->group) == 0) + return 0; + if (group->slots_per_job > slots_to_start) + return 0; + if (df_scratch/1024/1024/1024 < group->group.job_tmpdir_size + 20) + return 0; + if (group->group.job_gpu && server->daemon.gpus_max - server->daemon.gpus_used == 0) + return 0; + return 1; +} + +static int can_start_job_for_user(struct mxq_user_list *user, unsigned long df_scratch, struct mxq_server *server, long slots_to_start) { + /* Can we start a(nother) job for this user? */ + for (struct mxq_group_list *group = user->groups; group; group = group->next) + if (can_start_job(group, df_scratch, server, slots_to_start)) + return 1; + return 0; +} + +static unsigned long start_user(struct mxq_user_list *ulist, long slots_to_start, unsigned long df_scratch) { struct mxq_server *server; struct mxq_group_list *glist; struct mxq_group *group; - unsigned long df_scratch; - assert(ulist); assert(ulist->server); assert(ulist->groups); @@ -1432,37 +1455,17 @@ unsigned long start_user(struct mxq_user_list *ulist, long slots_to_start) mx_log_debug(" user=%s(%d) slots_to_start=%ld :: trying to start jobs for user.", group->user_name, group->user_uid, slots_to_start); - df_scratch=mx_df(MXQ_JOB_TMPDIR_FS "/."); - for (glist = ulist->groups; glist ; glist = glist->next) { group = &glist->group; - if (glist->jobs_running >= group->group_jobs) { - continue; - } - if (glist->jobs_running >= glist->jobs_max) { - continue; - } - if (mxq_group_jobs_inq(group) == 0) { - continue; - } - if (glist->slots_per_job > slots_to_start) { - continue; - } - if (df_scratch/1024/1024/1024 < group->job_tmpdir_size + 20) { - continue; - } - if (group->job_gpu && server->daemon.gpus_max - server->daemon.gpus_used == 0) { - continue; - } - - mx_log_info(" group=%s(%d):%lu slots_to_start=%ld slots_per_job=%lu :: trying to start job for group.", + if (can_start_job(glist, df_scratch, server, slots_to_start)) { + mx_log_info(" group=%s(%d):%lu slots_to_start=%ld slots_per_job=%lu :: trying to start job for group.", group->user_name, group->user_uid, group->group_id, slots_to_start, glist->slots_per_job); - - if (start_job(glist)) { - int slots_started = glist->slots_per_job; - return slots_started; + if (start_job(glist)) { + int slots_started = glist->slots_per_job; + return slots_started; + } } } return 0; @@ -1470,63 +1473,77 @@ unsigned long start_user(struct mxq_user_list *ulist, long slots_to_start) /**********************************************************************/ -long start_user_with_least_running_global_slot_count(struct mxq_server *server) -{ - struct mxq_user_list *ulist; - struct mxq_group_list *glist; - struct mxq_group *group; - unsigned long slots_started = 0; - unsigned long slots_free; - unsigned long global_slots_per_user; - int waiting = 0; +static int could_potentially_start_job(struct mxq_group_list *group) { + /* Could we start a(nother) job from this group if we had more resources + * free? Note, that group->jobs_max is the maximum number of jobs we are + * able to run, if we had enough resources. + */ + if (group->jobs_max > group->jobs_running && group->group.group_jobs_inq) + return 1; + else + return 0; +} - assert(server); +static int could_potentially_start_job_for_user(struct mxq_user_list *user) { + for (struct mxq_group_list *group=user->groups; group; group=group->next) + if (could_potentially_start_job(group)) + return 1; + return 0; +} - if (!server->user_cnt) - return 0; +static void move_user_to_end(struct mxq_server *server, struct mxq_user_list *user) { + struct mxq_user_list **ptr; - server_sort_users_by_running_global_slot_count(server); - slots_free = server->slots - server->slots_running; + if (!user->next) + return; - if (!slots_free) - return 0; + ptr = &server->users; + while (*ptr != user) + ptr = &(*ptr)->next; + *ptr = user->next; + ptr = &(user->next->next); + while (*ptr) + ptr = &(*ptr)->next; + *ptr = user; + user->next = NULL; +} - global_slots_per_user = server->global_slots_running / server->user_cnt; +static long start_user_with_least_running_global_slot_count(struct mxq_server *server) +{ + unsigned long slots_free = server->slots - server->slots_running; + if (!server->user_cnt || !slots_free) + return 0; + server_sort_users_by_running_global_slot_count(server); + unsigned long df_scratch=mx_df(MXQ_JOB_TMPDIR_FS "/."); + int waiting = 0; - for (ulist = server->users; ulist; ulist = ulist->next) { - /* if other users are waiting and this user is already using - * more slots than his fair share, do not start anything. - * (next users are using even more atm because list is sorted) - * */ - if (waiting && ulist->global_slots_running > global_slots_per_user) - /* returning -1 here signals main, that it should set the - * server status in the database to we are WAITING, which is - * just informational. - * */ + for (struct mxq_user_list *ulist = server->users; ulist; ulist = ulist->next) { + /* if a previous user is waiting for free resources, don't start jobs + * for later users. */ + if (waiting && can_start_job_for_user(ulist, df_scratch, server, slots_free)) + /* returning -1 here tells the daemon to set its status in the + * database to WAITING, which is just informational. */ return -1; - slots_started = start_user(ulist, slots_free); - if (slots_started) + unsigned long slots_started = start_user(ulist, slots_free, df_scratch); + if (slots_started) { + /* move user to end of list so that we get a round-robin with with + * other users which sort to the same precedence. */ + move_user_to_end(server, ulist); return slots_started; + } if (waiting) continue; - /* we didn't start a job for this user. Have a second look at the gorups of *this* + /* we didn't start a job for this user. Have a second look at the groups of *this* * user to see, if he has jobs pending, which we were able to start if we * only had enough free resources. * If so, set a flag that we don't start jobs for following users, if they already got their * fair share. - * Note, that glist->jobs_max is the maximum number of jobs we are able to run, if we had - * free resources and is 0 for jobs, we are not able to run at all. * */ - for (glist = ulist->groups; glist; glist = glist->next) { - group = &glist->group; - if (glist->jobs_max > glist->jobs_running && group->group_jobs_inq) { - waiting = 1; - break; - } - } + if (could_potentially_start_job_for_user(ulist)) + waiting = 1; } return 0; } @@ -1534,7 +1551,7 @@ long start_user_with_least_running_global_slot_count(struct mxq_server *server) /**********************************************************************/ -void server_dump(struct mxq_server *server) +static void server_dump(struct mxq_server *server) { struct mxq_user_list *ulist; struct mxq_group_list *glist; @@ -1596,7 +1613,7 @@ void server_dump(struct mxq_server *server) mx_log_info("====================== SERVER DUMP END ======================"); } -void server_free(struct mxq_server *server) +static void server_free(struct mxq_server *server) { struct mxq_user_list *ulist, *unext; struct mxq_group_list *glist, *gnext; @@ -1631,7 +1648,7 @@ void server_free(struct mxq_server *server) mx_log_finish(); } -void server_close(struct mxq_server *server) +static void server_close(struct mxq_server *server) { if (server->pidfilename) unlink(server->pidfilename); @@ -1784,7 +1801,7 @@ static void killstate_event(struct ppidcache *ppidcache, struct mxq_job_list *jl } } -int killall(struct mxq_server *server) +static int killall(struct mxq_server *server) { struct mxq_user_list *ulist; struct mxq_group_list *glist; @@ -1808,7 +1825,7 @@ int killall(struct mxq_server *server) return 0; } -int killall_over_time(struct ppidcache *ppidcache, struct mxq_server *server) +static int killall_over_time(struct ppidcache *ppidcache, struct mxq_server *server) { struct mxq_user_list *ulist; struct mxq_group_list *glist; @@ -1851,7 +1868,7 @@ int killall_over_time(struct ppidcache *ppidcache, struct mxq_server *server) return 0; } -int killall_over_memory(struct ppidcache *ppidcache, struct mxq_server *server) +static int killall_over_memory(struct ppidcache *ppidcache, struct mxq_server *server) { struct mxq_user_list *ulist; struct mxq_group_list *glist; @@ -1918,7 +1935,7 @@ int killall_over_memory(struct ppidcache *ppidcache, struct mxq_server *server) return 0; } -int killall_cancelled(struct ppidcache *ppidcache, struct mxq_server *server) +static int killall_cancelled(struct ppidcache *ppidcache, struct mxq_server *server) { struct mxq_user_list *ulist; struct mxq_group_list *glist; @@ -2380,7 +2397,7 @@ static int load_running_jobs(struct mxq_server *server) return job_cnt; } -int catchall(struct mxq_server *server) +static int catchall(struct mxq_server *server) { struct mxq_job_list *jlist; struct mxq_job *job; @@ -2472,7 +2489,7 @@ int catchall(struct mxq_server *server) return cnt; } -int load_running_groups(struct mxq_server *server) +static int load_running_groups(struct mxq_server *server) { struct mxq_group_list *glist; struct mxq_group *grps; @@ -2561,7 +2578,7 @@ static void server_umount_stale_job_mountdirs(struct mxq_server *server) { free(namelist); } -int recover_from_previous_crash(struct mxq_server *server) +static int recover_from_previous_crash(struct mxq_server *server) { assert(server); assert(server->mysql); diff --git a/mxqd_control.c b/mxqd_control.c index 02fd76ee..47903673 100644 --- a/mxqd_control.c +++ b/mxqd_control.c @@ -435,8 +435,11 @@ struct mxq_group_list *_server_add_group(struct mxq_server *server, struct mxq_g ulist->server = server; - ulist->next = server->users; - server->users = ulist; + /* add new user at tail, so longer waiting users are preferred */ + struct mxq_user_list **lastptr = &server->users; + while (*lastptr) + lastptr = &(*lastptr)->next; + *lastptr = ulist; server->user_cnt++; diff --git a/test_mx_util.c b/test_mx_util.c index 6f473093..20b58c81 100644 --- a/test_mx_util.c +++ b/test_mx_util.c @@ -489,6 +489,7 @@ static void test_listsort(void) } /* (100 0 1 2 50 50 2 1 0 100) -> ( 0 0 1 1 2 2 50 50 100 100) stable */ + /* also verify stable sort */ for (int i=0;i<10;i++) { o[i].next= i==9 ? NULL : &o[i+1]; } diff --git a/web/pages/mxq/mxq.in b/web/pages/mxq/mxq.in index f91d6918..eac6ac1f 100755 --- a/web/pages/mxq/mxq.in +++ b/web/pages/mxq/mxq.in @@ -135,29 +135,19 @@ sub humanSeconds { } sub size { - my $s = shift; - my $n = shift; - defined($n) or $n=1; - $s == 0 and return '0 B'; - - my @T=(); - for (my $f = 4; $f >= 0; $f--) { - my $t = int($s/(1000**$f)); - push @T, ($t > 0) ? $t : 0; # TB/GB/MB/Kb/B - $s -= $t*(1000**$f); - } - my @L; - for my $x ('TB','GB','MB','kB','B') { - my $y = shift @T; - if ($y != 0) { - if (defined($n)) { - $n--; - last if ($n < 0); - } - push @L,"$y $x"; - } - } - return join(' ',@L); + # ' 0 B ' + # ' 1023 B ' + # ' 1.0 KiB' + # '1023.9 MiB' + # ' 1.0 GiB' + + my ($s) = @_; + $s < 1024 and return sprintf('%6d B ', $s); + for my $unit (qw(KiB MiB GiB TiB PiB EiB ZiB YiB)) { + $s /= 1024; + $s < 1024 and return sprintf ('%6.1f %s', $s, $unit); + } + return sprintf('%6.1f %s', $s, 'YiB'); } sub days { @@ -708,9 +698,9 @@ sub group_table { $q->td({class=>'number'},$q->a({href=>selfurl("/group/$group_id")},$group_id)), $q->td($group_name), $q->td({class=>'number'},$job_threads), - $q->td({class=>'number'},size($job_memory*1000**2)), + $q->td({class=>'number'},size($job_memory*1048576)), $q->td({class=>'number'},days($job_time*60)), - $q->td({class=>'number'}, $job_tmpdir_size ? size($job_tmpdir_size*1000*1000*1000) : '-'), + $q->td({class=>'number'}, $job_tmpdir_size ? size($job_tmpdir_size*1073741824) : '-'), $q->td({class=>'number'}, $job_gpu ? $job_gpu : ""), $q->td($q->a({href=>my_url('groups',{user_name=>$user_name})},$user_name)), $q->td(group_status($group_status)), @@ -777,8 +767,8 @@ sub group_table { $q->td({class=>'number'},$q->a({href=>selfurl("/group/$group_id")},$group_id)), $q->td($group_name), $q->td({class=>'number'},$job_threads), - $q->td({class=>'number'},size($job_memory*1000**2)), - $q->td({class=>'number'},size($stats_max_sumrss*1000)), + $q->td({class=>'number'},size($job_memory*1048576)), + $q->td({class=>'number'},size($stats_max_sumrss*1024)), $q->td({class=>'number'},humanSeconds($job_time*60)), $q->td({class=>'number'},humanSeconds($stats_run_sec+$stats_idle_sec,2)), $q->td($q->a({href=>my_url('groups',{user_name=>$user_name})},$user_name)), @@ -908,15 +898,15 @@ sub server() { # $q->td($pid_starttime), $q->td({class=>'number'},$daemon_pid), $q->td({class=>'number'},$daemon_slots), - $q->td({class=>'number'},size($daemon_memory*1000000)), + $q->td({class=>'number'},size($daemon_memory*1048576)), $q->td({class=>'number'},$daemon_gpus_max), $q->td({class=>'number'},$daemon_maxtime ? $daemon_maxtime : ''), - $q->td({class=>'number'},size($daemon_memory_limit_slot_soft*1000000)), + $q->td({class=>'number'},size($daemon_memory_limit_slot_soft*1048576)), # $q->td({class=>'number'},$daemon_memory_limit_slot_hard), $q->td({class=>'number'},$daemon_jobs_running), $q->td({class=>'number'},$daemon_slots_running), $q->td({class=>'number'},$daemon_threads_running), - $q->td({class=>'number'},size($daemon_memory_used*1000000)), + $q->td({class=>'number'},size($daemon_memory_used*1048576)), $q->td({class=>'number'},$daemon_gpus_used), # $q->td($mtime), # $q->td($daemon_start), @@ -933,14 +923,14 @@ sub server() { $q->td({class=>'center', colspan=>3},$S{servers}.' servers'), $q->td($dist), $q->td({class=>'center', colspan=>3},$S{daemon_slots}.' cores'), - $q->td({class=>'number'},size($S{daemon_memory_sum}*1000**2)), + $q->td({class=>'number'},size($S{daemon_memory_sum}*1048576)), $q->td({class=>'number'},$S{daemon_gpus_max_sum}), $q->td(' '), $q->td(' '), $q->td(' '), $q->td({class=>'number'},$S{daemon_slots_running}), $q->td(' '), - $q->td({class=>'number'},size($S{daemon_memory_used_sum}*1000**2)), + $q->td({class=>'number'},size($S{daemon_memory_used_sum}*1048576)), $q->td({class=>'number'},$S{daemon_gpus_used_sum}) );