Skip to content

Commit

Permalink
mxqd: start users in order of least running global slot count
Browse files Browse the repository at this point in the history
  • Loading branch information
mariux committed Nov 4, 2015
1 parent bdb8114 commit 6e0a776
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 2 deletions.
56 changes: 54 additions & 2 deletions mxqd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,54 @@ unsigned long start_users(struct mxq_server *server)

/**********************************************************************/

long start_user_with_least_running_global_slot_count(struct mxq_server *server)
{
struct mxq_user_list *ulist;
struct mxq_group_list *glist;
unsigned long slots_started = 0;
unsigned long slots_free;
unsigned long global_slots_per_user;
int waiting = 0;

assert(server);

if (!server->user_cnt)
return 0;

server_sort_users_by_running_global_slot_count(server);
slots_free = server->slots - server->slots_running;

if (!slots_free)
return 0;

global_slots_per_user = server->global_slots_running / server->user_cnt;

for (ulist = server->users; ulist; ulist = ulist->next) {
/* if other users are waiting and this user is already using
* more slots then avg user in cluster do not start anything
* (next users are using even more atm because list is sorted) */
if (waiting && ulist->global_slots_running > global_slots_per_user)
return -1;

slots_started = start_user(ulist, 1, slots_free);
if (slots_started)
return slots_started;

if (waiting)
continue;

for (glist = ulist->groups; glist; glist = glist->next) {
if (glist->jobs_max > glist->jobs_running) {
waiting = 1;
break;
}
}
}
return 0;
}

/**********************************************************************/

int remove_orphaned_group_lists(struct mxq_server *server)
{
struct mxq_user_list *ulist, *unext, *uprev;
Expand Down Expand Up @@ -2281,9 +2329,13 @@ int main(int argc, char *argv[])
continue;
}

slots_started = start_users(server);
if (slots_started)
slots_started = start_user_with_least_running_global_slot_count(server);
if (slots_started == -1) {
mx_log_debug("no slots_started => we have users waiting for free slots.");
slots_started = 0;
} else if (slots_started) {
mx_log_info("slots_started=%lu :: Main Loop started %lu slots.", slots_started, slots_started);
}

if (!slots_started && !slots_returned && !global_sigint_cnt && !global_sigterm_cnt) {
if (!server->jobs_running) {
Expand Down
51 changes: 51 additions & 0 deletions mxqd_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,54 @@ struct mxq_group_list *server_update_group(struct mxq_server *server, struct mxq

return _user_list_update_group(ulist, group);
}


void server_sort_users_by_running_global_slot_count(struct mxq_server *server)
{
struct mxq_user_list *ulist;
struct mxq_user_list *unext;
struct mxq_user_list *uprev;
struct mxq_user_list *uroot;
struct mxq_user_list *current;

assert(server);

if (!server->user_cnt)
return;

for (ulist = server->users, uroot = NULL; ulist; ulist = unext) {
unext = ulist->next;

ulist->next = NULL;

if (!uroot) {
uroot = ulist;
continue;
}

for (current = uroot, uprev = NULL; (current || uprev); uprev = current, current = current->next) {
if (!current) {
uprev->next = ulist;
break;
}
if (ulist->global_slots_running > current->global_slots_running) {
continue;
}
if (ulist->global_slots_running == current->global_slots_running
&& ulist->global_threads_running > current->global_threads_running) {
continue;
}

ulist->next = current;

if (!uprev) {
uroot = ulist;
} else {
uprev->next = ulist;
}
break;
}
}

server->users = uroot;
}
2 changes: 2 additions & 0 deletions mxqd_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ static struct mxq_group_list *_user_list_update_group(struct mxq_user_list *ulis
struct mxq_group_list *_server_add_group(struct mxq_server *server, struct mxq_group *group);
struct mxq_group_list *_user_list_add_group(struct mxq_user_list *ulist, struct mxq_group *group);

void server_sort_users_by_running_global_slot_count(struct mxq_server *server);

#endif

0 comments on commit 6e0a776

Please sign in to comment.