From 6e0a776d6ef64864bf720844fdcee0f61e7451a9 Mon Sep 17 00:00:00 2001 From: Marius Tolzmann Date: Tue, 27 Oct 2015 18:24:46 +0100 Subject: [PATCH] mxqd: start users in order of least running global slot count --- mxqd.c | 56 ++++++++++++++++++++++++++++++++++++++++++++++++-- mxqd_control.c | 51 +++++++++++++++++++++++++++++++++++++++++++++ mxqd_control.h | 2 ++ 3 files changed, 107 insertions(+), 2 deletions(-) diff --git a/mxqd.c b/mxqd.c index 38a22895..f15df63e 100644 --- a/mxqd.c +++ b/mxqd.c @@ -1249,6 +1249,54 @@ unsigned long start_users(struct mxq_server *server) /**********************************************************************/ +long start_user_with_least_running_global_slot_count(struct mxq_server *server) +{ + struct mxq_user_list *ulist; + struct mxq_group_list *glist; + unsigned long slots_started = 0; + unsigned long slots_free; + unsigned long global_slots_per_user; + int waiting = 0; + + assert(server); + + if (!server->user_cnt) + return 0; + + server_sort_users_by_running_global_slot_count(server); + slots_free = server->slots - server->slots_running; + + if (!slots_free) + return 0; + + global_slots_per_user = server->global_slots_running / server->user_cnt; + + for (ulist = server->users; ulist; ulist = ulist->next) { + /* if other users are waiting and this user is already using + * more slots then avg user in cluster do not start anything + * (next users are using even more atm because list is sorted) */ + if (waiting && ulist->global_slots_running > global_slots_per_user) + return -1; + + slots_started = start_user(ulist, 1, slots_free); + if (slots_started) + return slots_started; + + if (waiting) + continue; + + for (glist = ulist->groups; glist; glist = glist->next) { + if (glist->jobs_max > glist->jobs_running) { + waiting = 1; + break; + } + } + } + return 0; +} + +/**********************************************************************/ + int remove_orphaned_group_lists(struct mxq_server *server) { struct mxq_user_list *ulist, *unext, *uprev; @@ -2281,9 +2329,13 @@ int main(int argc, char *argv[]) continue; } - slots_started = start_users(server); - if (slots_started) + slots_started = start_user_with_least_running_global_slot_count(server); + if (slots_started == -1) { + mx_log_debug("no slots_started => we have users waiting for free slots."); + slots_started = 0; + } else if (slots_started) { mx_log_info("slots_started=%lu :: Main Loop started %lu slots.", slots_started, slots_started); + } if (!slots_started && !slots_returned && !global_sigint_cnt && !global_sigterm_cnt) { if (!server->jobs_running) { diff --git a/mxqd_control.c b/mxqd_control.c index 20a29c95..75497957 100644 --- a/mxqd_control.c +++ b/mxqd_control.c @@ -411,3 +411,54 @@ struct mxq_group_list *server_update_group(struct mxq_server *server, struct mxq return _user_list_update_group(ulist, group); } + + +void server_sort_users_by_running_global_slot_count(struct mxq_server *server) +{ + struct mxq_user_list *ulist; + struct mxq_user_list *unext; + struct mxq_user_list *uprev; + struct mxq_user_list *uroot; + struct mxq_user_list *current; + + assert(server); + + if (!server->user_cnt) + return; + + for (ulist = server->users, uroot = NULL; ulist; ulist = unext) { + unext = ulist->next; + + ulist->next = NULL; + + if (!uroot) { + uroot = ulist; + continue; + } + + for (current = uroot, uprev = NULL; (current || uprev); uprev = current, current = current->next) { + if (!current) { + uprev->next = ulist; + break; + } + if (ulist->global_slots_running > current->global_slots_running) { + continue; + } + if (ulist->global_slots_running == current->global_slots_running + && ulist->global_threads_running > current->global_threads_running) { + continue; + } + + ulist->next = current; + + if (!uprev) { + uroot = ulist; + } else { + uprev->next = ulist; + } + break; + } + } + + server->users = uroot; +} diff --git a/mxqd_control.h b/mxqd_control.h index 3a8d69e9..9459b5d4 100644 --- a/mxqd_control.h +++ b/mxqd_control.h @@ -26,4 +26,6 @@ static struct mxq_group_list *_user_list_update_group(struct mxq_user_list *ulis struct mxq_group_list *_server_add_group(struct mxq_server *server, struct mxq_group *group); struct mxq_group_list *_user_list_add_group(struct mxq_user_list *ulist, struct mxq_group *group); +void server_sort_users_by_running_global_slot_count(struct mxq_server *server); + #endif