Skip to content

Commit

Permalink
Merge remote-tracking branch 'mariux/issues/issue34'
Browse files Browse the repository at this point in the history
try to fix #34

* mariux/issues/issue34:
  mxqd_control: Fix calculation of jobs_max
  mxqd: start users in order of least running global slot count
  mxqd_control: Track database values for global running slots and jobs
  • Loading branch information
mariux committed Nov 4, 2015
2 parents 79db558 + 232f7b1 commit d7ae110
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 5 deletions.
70 changes: 65 additions & 5 deletions mxqd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,54 @@ unsigned long start_users(struct mxq_server *server)

/**********************************************************************/

long start_user_with_least_running_global_slot_count(struct mxq_server *server)
{
struct mxq_user_list *ulist;
struct mxq_group_list *glist;
unsigned long slots_started = 0;
unsigned long slots_free;
unsigned long global_slots_per_user;
int waiting = 0;

assert(server);

if (!server->user_cnt)
return 0;

server_sort_users_by_running_global_slot_count(server);
slots_free = server->slots - server->slots_running;

if (!slots_free)
return 0;

global_slots_per_user = server->global_slots_running / server->user_cnt;

for (ulist = server->users; ulist; ulist = ulist->next) {
/* if other users are waiting and this user is already using
* more slots then avg user in cluster do not start anything
* (next users are using even more atm because list is sorted) */
if (waiting && ulist->global_slots_running > global_slots_per_user)
return -1;

slots_started = start_user(ulist, 1, slots_free);
if (slots_started)
return slots_started;

if (waiting)
continue;

for (glist = ulist->groups; glist; glist = glist->next) {
if (glist->jobs_max > glist->jobs_running) {
waiting = 1;
break;
}
}
}
return 0;
}

/**********************************************************************/

int remove_orphaned_group_lists(struct mxq_server *server)
{
struct mxq_user_list *ulist, *unext, *uprev;
Expand Down Expand Up @@ -1336,18 +1384,23 @@ void server_dump(struct mxq_server *server)
continue;
}
group = &ulist->groups[0].group;
mx_log_info(" user=%s(%d) slots_running=%lu",
mx_log_info(" user=%s(%d) slots_running=%lu global_slots_running=%lu global_threads_running=%lu",
group->user_name,
group->user_uid,
ulist->slots_running);
ulist->slots_running,
ulist->global_slots_running,
ulist->global_threads_running);

for (glist = ulist->groups; glist; glist = glist->next) {
group = &glist->group;
mx_log_info(" group=%s(%d):%lu %s jobs_in_q=%lu",

mx_log_info(" group=%s(%d):%lu %s jobs_max=%lu slots_per_job=%d jobs_in_q=%lu",
group->user_name,
group->user_uid,
group->group_id,
group->group_name,
glist->jobs_max,
glist->slots_per_job,
mxq_group_jobs_inq(group));
for (jlist = glist->jobs; jlist; jlist = jlist->next) {
job = &jlist->job;
Expand All @@ -1369,6 +1422,9 @@ void server_dump(struct mxq_server *server)
server->slots,
server->threads_running,
server->jobs_running);
mx_log_info("global_slots_running=%lu global_threads_running=%lu",
server->global_slots_running,
server->global_threads_running);
cpuset_log("cpu set running",
&server->cpu_set_running);
mx_log_info("====================== SERVER DUMP END ======================");
Expand Down Expand Up @@ -2275,9 +2331,13 @@ int main(int argc, char *argv[])
continue;
}

slots_started = start_users(server);
if (slots_started)
slots_started = start_user_with_least_running_global_slot_count(server);
if (slots_started == -1) {
mx_log_debug("no slots_started => we have users waiting for free slots.");
slots_started = 0;
} else if (slots_started) {
mx_log_info("slots_started=%lu :: Main Loop started %lu slots.", slots_started, slots_started);
}

if (!slots_started && !slots_returned && !global_sigint_cnt && !global_sigterm_cnt) {
if (!server->jobs_running) {
Expand Down
9 changes: 9 additions & 0 deletions mxqd.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ struct mxq_group_list {
unsigned long slots_running;
unsigned long memory_used;

unsigned long global_threads_running;
unsigned long global_slots_running;

short orphaned;
};

Expand All @@ -54,6 +57,9 @@ struct mxq_user_list {
unsigned long threads_running;
unsigned long slots_running;
unsigned long memory_used;

unsigned long global_threads_running;
unsigned long global_slots_running;
};

struct mxq_server {
Expand All @@ -69,6 +75,9 @@ struct mxq_server {
unsigned long memory_used;
cpu_set_t cpu_set_running;

unsigned long global_threads_running;
unsigned long global_slots_running;

unsigned long slots;
unsigned long memory_total;
long double memory_avg_per_slot;
Expand Down
82 changes: 82 additions & 0 deletions mxqd_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ static void _group_list_init(struct mxq_group_list *glist)
}
jobs_max /= group->job_threads;

if (jobs_max > server->slots / slots_per_job)
jobs_max = server->slots / slots_per_job;

/* limit maximum number of jobs on user/group request */
if (group->job_max_per_node && jobs_max > group->job_max_per_node)
jobs_max = group->job_max_per_node;
Expand Down Expand Up @@ -323,6 +326,15 @@ struct mxq_group_list *_user_list_add_group(struct mxq_user_list *ulist, struct
ulist->group_cnt++;
server->group_cnt++;

glist->global_slots_running = group->group_slots_running;
glist->global_threads_running = group->group_jobs_running * group->job_threads;

ulist->global_slots_running += glist->global_slots_running;
ulist->global_threads_running += glist->global_threads_running;

server->global_slots_running += glist->global_slots_running;
server->global_threads_running += glist->global_threads_running;

_group_list_init(glist);

return glist;
Expand Down Expand Up @@ -354,15 +366,34 @@ struct mxq_group_list *_server_add_group(struct mxq_server *server, struct mxq_g
static struct mxq_group_list *_user_list_update_group(struct mxq_user_list *ulist, struct mxq_group *group)
{
struct mxq_group_list *glist;
struct mxq_server *server;

assert(ulist);
assert(group);
assert(ulist->server);

server = ulist->server;

glist = _group_list_find_by_group(ulist->groups, group);
if (!glist) {
return _user_list_add_group(ulist, group);
}

server->global_slots_running -= glist->global_slots_running;
server->global_threads_running -= glist->global_threads_running;

ulist->global_slots_running -= glist->global_slots_running;
ulist->global_threads_running -= glist->global_threads_running;

glist->global_slots_running = group->group_slots_running;
glist->global_threads_running = group->group_jobs_running * group->job_threads;

ulist->global_slots_running += glist->global_slots_running;
ulist->global_threads_running += glist->global_threads_running;

server->global_slots_running += glist->global_slots_running;
server->global_threads_running += glist->global_threads_running;

mxq_group_free_content(&glist->group);

memcpy(&glist->group, group, sizeof(*group));
Expand All @@ -383,3 +414,54 @@ struct mxq_group_list *server_update_group(struct mxq_server *server, struct mxq

return _user_list_update_group(ulist, group);
}


void server_sort_users_by_running_global_slot_count(struct mxq_server *server)
{
struct mxq_user_list *ulist;
struct mxq_user_list *unext;
struct mxq_user_list *uprev;
struct mxq_user_list *uroot;
struct mxq_user_list *current;

assert(server);

if (!server->user_cnt)
return;

for (ulist = server->users, uroot = NULL; ulist; ulist = unext) {
unext = ulist->next;

ulist->next = NULL;

if (!uroot) {
uroot = ulist;
continue;
}

for (current = uroot, uprev = NULL; (current || uprev); uprev = current, current = current->next) {
if (!current) {
uprev->next = ulist;
break;
}
if (ulist->global_slots_running > current->global_slots_running) {
continue;
}
if (ulist->global_slots_running == current->global_slots_running
&& ulist->global_threads_running > current->global_threads_running) {
continue;
}

ulist->next = current;

if (!uprev) {
uroot = ulist;
} else {
uprev->next = ulist;
}
break;
}
}

server->users = uroot;
}
2 changes: 2 additions & 0 deletions mxqd_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ static struct mxq_group_list *_user_list_update_group(struct mxq_user_list *ulis
struct mxq_group_list *_server_add_group(struct mxq_server *server, struct mxq_group *group);
struct mxq_group_list *_user_list_add_group(struct mxq_user_list *ulist, struct mxq_group *group);

void server_sort_users_by_running_global_slot_count(struct mxq_server *server);

#endif

0 comments on commit d7ae110

Please sign in to comment.