Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
mxqd: implement time contraint
This allows mxqd to be started with "-t <minutes>" to have a job time
contraint for the server. jobs which are submitten with a longer maximum
runtime are not started on this server.

This is an emergency implementation and might need some
cleanup/rethinking in the future. But we need the feature now.
  • Loading branch information
donald committed Jul 13, 2016
1 parent 19b1545 commit 0a4a773
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 23 deletions.
44 changes: 21 additions & 23 deletions mxqd.c
Expand Up @@ -340,6 +340,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
unsigned long arg_memory_total = 2048;
unsigned long arg_memory_limit_slot_soft = 0;
unsigned long arg_memory_limit_slot_hard = 0;
unsigned long arg_maxtime = 0;
int i;
struct mxq_daemon *daemon = &server->daemon;

Expand Down Expand Up @@ -369,6 +370,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
MX_OPTION_REQUIRED_ARG("hostname", 6),
MX_OPTION_OPTIONAL_ARG("mysql-default-file", 'M'),
MX_OPTION_OPTIONAL_ARG("mysql-default-group", 'S'),
MX_OPTION_OPTIONAL_ARG("max-time", 't'),
MX_OPTION_END
};

Expand Down Expand Up @@ -515,6 +517,13 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
case 'S':
arg_mysql_default_group = optctl.optarg;
break;

case 't':
if (mx_strtoul(optctl.optarg, &arg_maxtime) < 0) {
mx_log_err("Invalid argument supplied for option --max-time '%s': %m", optctl.optarg);
return -EX_USAGE;
}
break;
}
}

Expand Down Expand Up @@ -643,6 +652,9 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
mx_log_err("MAIN: cpuset_init() failed. exiting.");
return -EX_OSERR;
}

server->maxtime = arg_maxtime;

server->memory_total = arg_memory_total;

server->memory_avg_per_slot = (long double)server->memory_total / (long double)server->slots;
Expand Down Expand Up @@ -1233,7 +1245,7 @@ unsigned long start_job(struct mxq_group_list *glist)

/**********************************************************************/

unsigned long start_user(struct mxq_user_list *ulist, int job_limit, long slots_to_start)
unsigned long start_user(struct mxq_user_list *ulist, int job_limit, long slots_to_start, int *need_more_slots)
{
struct mxq_server *server;
struct mxq_group_list *glist;
Expand Down Expand Up @@ -1277,10 +1289,13 @@ unsigned long start_user(struct mxq_user_list *ulist, int job_limit, long slots_
if (mxq_group_jobs_inq(group) == 0) {
goto start_user_continue;
}
if (server->maxtime && glist->group.job_time>server->maxtime) {
goto start_user_continue;
}
if (glist->slots_per_job > slots_to_start) {
*need_more_slots=1;
goto start_user_continue;
}

if (group->group_priority < prio) {
if (started) {
goto start_user_rewind;
Expand Down Expand Up @@ -1315,12 +1330,10 @@ unsigned long start_user(struct mxq_user_list *ulist, int job_limit, long slots_
long start_user_with_least_running_global_slot_count(struct mxq_server *server)
{
struct mxq_user_list *ulist;
struct mxq_group_list *glist;
struct mxq_group *group;
unsigned long slots_started = 0;
unsigned long slots_free;
unsigned long global_slots_per_user;
int waiting = 0;
int need_more_slots=0;

assert(server);

Expand All @@ -1336,25 +1349,10 @@ long start_user_with_least_running_global_slot_count(struct mxq_server *server)
global_slots_per_user = server->global_slots_running / server->user_cnt;

for (ulist = server->users; ulist; ulist = ulist->next) {
/* if other users are waiting and this user is already using
* more slots then avg user in cluster do not start anything
* (next users are using even more atm because list is sorted) */
if (waiting && ulist->global_slots_running > global_slots_per_user)
return -1;

slots_started = start_user(ulist, 1, slots_free);
if (slots_started)
need_more_slots=0;
slots_started = start_user(ulist, 1, slots_free, &need_more_slots);
if (slots_started || need_more_slots) {
return slots_started;

if (waiting)
continue;

for (glist = ulist->groups; glist; glist = glist->next) {
group = &glist->group;
if (glist->jobs_max > glist->jobs_running && group->group_jobs_inq) {
waiting = 1;
break;
}
}
}
return 0;
Expand Down
1 change: 1 addition & 0 deletions mxqd.h
Expand Up @@ -86,6 +86,7 @@ struct mxq_server {
long double memory_avg_per_slot;
unsigned long memory_limit_slot_soft;
unsigned long memory_limit_slot_hard;
unsigned long maxtime;
cpu_set_t cpu_set_available;

struct mx_mysql *mysql;
Expand Down

0 comments on commit 0a4a773

Please sign in to comment.