diff --git a/mxqd.c b/mxqd.c index 91134842..05610332 100644 --- a/mxqd.c +++ b/mxqd.c @@ -1249,6 +1249,54 @@ unsigned long start_users(struct mxq_server *server) /**********************************************************************/ +long start_user_with_least_running_global_slot_count(struct mxq_server *server) +{ + struct mxq_user_list *ulist; + struct mxq_group_list *glist; + unsigned long slots_started = 0; + unsigned long slots_free; + unsigned long global_slots_per_user; + int waiting = 0; + + assert(server); + + if (!server->user_cnt) + return 0; + + server_sort_users_by_running_global_slot_count(server); + slots_free = server->slots - server->slots_running; + + if (!slots_free) + return 0; + + global_slots_per_user = server->global_slots_running / server->user_cnt; + + for (ulist = server->users; ulist; ulist = ulist->next) { + /* if other users are waiting and this user is already using + * more slots then avg user in cluster do not start anything + * (next users are using even more atm because list is sorted) */ + if (waiting && ulist->global_slots_running > global_slots_per_user) + return -1; + + slots_started = start_user(ulist, 1, slots_free); + if (slots_started) + return slots_started; + + if (waiting) + continue; + + for (glist = ulist->groups; glist; glist = glist->next) { + if (glist->jobs_max > glist->jobs_running) { + waiting = 1; + break; + } + } + } + return 0; +} + +/**********************************************************************/ + int remove_orphaned_group_lists(struct mxq_server *server) { struct mxq_user_list *ulist, *unext, *uprev; @@ -1336,18 +1384,23 @@ void server_dump(struct mxq_server *server) continue; } group = &ulist->groups[0].group; - mx_log_info(" user=%s(%d) slots_running=%lu", + mx_log_info(" user=%s(%d) slots_running=%lu global_slots_running=%lu global_threads_running=%lu", group->user_name, group->user_uid, - ulist->slots_running); + ulist->slots_running, + ulist->global_slots_running, + ulist->global_threads_running); for (glist = ulist->groups; glist; glist = glist->next) { group = &glist->group; - mx_log_info(" group=%s(%d):%lu %s jobs_in_q=%lu", + + mx_log_info(" group=%s(%d):%lu %s jobs_max=%lu slots_per_job=%d jobs_in_q=%lu", group->user_name, group->user_uid, group->group_id, group->group_name, + glist->jobs_max, + glist->slots_per_job, mxq_group_jobs_inq(group)); for (jlist = glist->jobs; jlist; jlist = jlist->next) { job = &jlist->job; @@ -1369,6 +1422,9 @@ void server_dump(struct mxq_server *server) server->slots, server->threads_running, server->jobs_running); + mx_log_info("global_slots_running=%lu global_threads_running=%lu", + server->global_slots_running, + server->global_threads_running); cpuset_log("cpu set running", &server->cpu_set_running); mx_log_info("====================== SERVER DUMP END ======================"); @@ -2275,9 +2331,13 @@ int main(int argc, char *argv[]) continue; } - slots_started = start_users(server); - if (slots_started) + slots_started = start_user_with_least_running_global_slot_count(server); + if (slots_started == -1) { + mx_log_debug("no slots_started => we have users waiting for free slots."); + slots_started = 0; + } else if (slots_started) { mx_log_info("slots_started=%lu :: Main Loop started %lu slots.", slots_started, slots_started); + } if (!slots_started && !slots_returned && !global_sigint_cnt && !global_sigterm_cnt) { if (!server->jobs_running) { diff --git a/mxqd.h b/mxqd.h index 8b115a99..82ce6be8 100644 --- a/mxqd.h +++ b/mxqd.h @@ -39,6 +39,9 @@ struct mxq_group_list { unsigned long slots_running; unsigned long memory_used; + unsigned long global_threads_running; + unsigned long global_slots_running; + short orphaned; }; @@ -54,6 +57,9 @@ struct mxq_user_list { unsigned long threads_running; unsigned long slots_running; unsigned long memory_used; + + unsigned long global_threads_running; + unsigned long global_slots_running; }; struct mxq_server { @@ -69,6 +75,9 @@ struct mxq_server { unsigned long memory_used; cpu_set_t cpu_set_running; + unsigned long global_threads_running; + unsigned long global_slots_running; + unsigned long slots; unsigned long memory_total; long double memory_avg_per_slot; diff --git a/mxqd_control.c b/mxqd_control.c index 9055f434..73233414 100644 --- a/mxqd_control.c +++ b/mxqd_control.c @@ -52,6 +52,9 @@ static void _group_list_init(struct mxq_group_list *glist) } jobs_max /= group->job_threads; + if (jobs_max > server->slots / slots_per_job) + jobs_max = server->slots / slots_per_job; + /* limit maximum number of jobs on user/group request */ if (group->job_max_per_node && jobs_max > group->job_max_per_node) jobs_max = group->job_max_per_node; @@ -323,6 +326,15 @@ struct mxq_group_list *_user_list_add_group(struct mxq_user_list *ulist, struct ulist->group_cnt++; server->group_cnt++; + glist->global_slots_running = group->group_slots_running; + glist->global_threads_running = group->group_jobs_running * group->job_threads; + + ulist->global_slots_running += glist->global_slots_running; + ulist->global_threads_running += glist->global_threads_running; + + server->global_slots_running += glist->global_slots_running; + server->global_threads_running += glist->global_threads_running; + _group_list_init(glist); return glist; @@ -354,15 +366,34 @@ struct mxq_group_list *_server_add_group(struct mxq_server *server, struct mxq_g static struct mxq_group_list *_user_list_update_group(struct mxq_user_list *ulist, struct mxq_group *group) { struct mxq_group_list *glist; + struct mxq_server *server; assert(ulist); assert(group); + assert(ulist->server); + + server = ulist->server; glist = _group_list_find_by_group(ulist->groups, group); if (!glist) { return _user_list_add_group(ulist, group); } + server->global_slots_running -= glist->global_slots_running; + server->global_threads_running -= glist->global_threads_running; + + ulist->global_slots_running -= glist->global_slots_running; + ulist->global_threads_running -= glist->global_threads_running; + + glist->global_slots_running = group->group_slots_running; + glist->global_threads_running = group->group_jobs_running * group->job_threads; + + ulist->global_slots_running += glist->global_slots_running; + ulist->global_threads_running += glist->global_threads_running; + + server->global_slots_running += glist->global_slots_running; + server->global_threads_running += glist->global_threads_running; + mxq_group_free_content(&glist->group); memcpy(&glist->group, group, sizeof(*group)); @@ -383,3 +414,54 @@ struct mxq_group_list *server_update_group(struct mxq_server *server, struct mxq return _user_list_update_group(ulist, group); } + + +void server_sort_users_by_running_global_slot_count(struct mxq_server *server) +{ + struct mxq_user_list *ulist; + struct mxq_user_list *unext; + struct mxq_user_list *uprev; + struct mxq_user_list *uroot; + struct mxq_user_list *current; + + assert(server); + + if (!server->user_cnt) + return; + + for (ulist = server->users, uroot = NULL; ulist; ulist = unext) { + unext = ulist->next; + + ulist->next = NULL; + + if (!uroot) { + uroot = ulist; + continue; + } + + for (current = uroot, uprev = NULL; (current || uprev); uprev = current, current = current->next) { + if (!current) { + uprev->next = ulist; + break; + } + if (ulist->global_slots_running > current->global_slots_running) { + continue; + } + if (ulist->global_slots_running == current->global_slots_running + && ulist->global_threads_running > current->global_threads_running) { + continue; + } + + ulist->next = current; + + if (!uprev) { + uroot = ulist; + } else { + uprev->next = ulist; + } + break; + } + } + + server->users = uroot; +} diff --git a/mxqd_control.h b/mxqd_control.h index 3a8d69e9..9459b5d4 100644 --- a/mxqd_control.h +++ b/mxqd_control.h @@ -26,4 +26,6 @@ static struct mxq_group_list *_user_list_update_group(struct mxq_user_list *ulis struct mxq_group_list *_server_add_group(struct mxq_server *server, struct mxq_group *group); struct mxq_group_list *_user_list_add_group(struct mxq_user_list *ulist, struct mxq_group *group); +void server_sort_users_by_running_global_slot_count(struct mxq_server *server); + #endif