Skip to content

Commit

Permalink
mxqsub: Add option --disabled-servers
Browse files Browse the repository at this point in the history
Add a new option so that specific servers can be excluded from starting
jobs for this group. E.g.:

    mxqsub --disabled-servers="dontpanic sigusr2" sleep 100
  • Loading branch information
donald committed Apr 13, 2020
1 parent 8c60883 commit 337ba3c
Showing 1 changed file with 35 additions and 8 deletions.
43 changes: 35 additions & 8 deletions mxqsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "mx_util.h"
#include "mx_getopt.h"
#include "mx_mysql.h"
#include "keywordset.h"

#include "mxq.h"

Expand Down Expand Up @@ -69,9 +70,10 @@ static void print_usage(void)
" Scheduling is done based on the resources a job needs and\n"
" on the priority given to the job.\n"
"\n"
" -j, --threads=NUMBER set number of threads (default: 1)\n"
" -m, --memory=SIZE set amount of memory (default: 2G)\n"
" --tmpdir=SIZE set size of MXQ_JOB_TMPDIR (default: 0)\n"
" -j, --threads=NUMBER set number of threads (default: 1)\n"
" -m, --memory=SIZE set amount of memory (default: 2G)\n"
" --tmpdir=SIZE set size of MXQ_JOB_TMPDIR (default: 0)\n"
" --disabled-servers=STRING set list of disabled servers (default: '')\n"
"\n"
" [SIZE] may be suffixed with a combination of T, G and M\n"
" to specify tebibytes, gibibytes and mebibytes.\n"
Expand Down Expand Up @@ -168,6 +170,7 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g)
" AND job_tmpdir_size = ?"
" AND job_max_per_node = ?"
" AND group_priority = ?"
" AND group_disabled_servers = ?"
" AND group_status = 0"
" AND group_flags & ? = 0 "
" ORDER BY group_id DESC"
Expand All @@ -189,7 +192,8 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g)
res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size));
res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->job_max_per_node));
res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority));
res += mx_mysql_statement_param_bind(stmt, 12, uint64, &(flags));
res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_disabled_servers));
res += mx_mysql_statement_param_bind(stmt, 13, uint64, &(flags));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
Expand Down Expand Up @@ -253,6 +257,7 @@ static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g
" AND job_tmpdir_size = ?"
" AND job_max_per_node = ?"
" AND group_priority = ?"
" AND group_disabled_servers = ?"
" AND group_status = 0"
" AND group_id = ?"
" AND group_flags & ? = 0 "
Expand All @@ -275,8 +280,9 @@ static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g
res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size));
res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->job_max_per_node));
res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority));
res += mx_mysql_statement_param_bind(stmt, 12, uint64, &(g->group_id));
res += mx_mysql_statement_param_bind(stmt, 13, uint64, &(flags));
res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_disabled_servers));
res += mx_mysql_statement_param_bind(stmt, 13, uint64, &(g->group_id));
res += mx_mysql_statement_param_bind(stmt, 14, uint64, &(flags));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
Expand Down Expand Up @@ -332,6 +338,7 @@ static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g
" AND job_tmpdir_size = ?"
" AND job_max_per_node = ?"
" AND group_priority = ?"
" AND group_disabled_servers = ?"
" AND group_status = 0"
" AND ("
"group_jobs_running > 0"
Expand All @@ -358,7 +365,8 @@ static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g
res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size));
res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->job_max_per_node));
res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority));
res += mx_mysql_statement_param_bind(stmt, 12, uint64, &(flags));
res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_disabled_servers));
res += mx_mysql_statement_param_bind(stmt, 13, uint64, &(flags));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
Expand Down Expand Up @@ -415,7 +423,8 @@ static int add_group(struct mx_mysql *mysql, struct mxq_group *g)
" job_time = ?,"
" job_tmpdir_size = ?,"
" job_max_per_node = ?,"
" group_priority = ?");
" group_priority = ?,"
" group_disabled_servers = ?");
if (!stmt) {
mx_log_err("mx_mysql_statement_prepare(): %m");
return -errno;
Expand All @@ -433,6 +442,7 @@ static int add_group(struct mx_mysql *mysql, struct mxq_group *g)
res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size));
res += mx_mysql_statement_param_bind(stmt,10, uint16, &(g->job_max_per_node));
res += mx_mysql_statement_param_bind(stmt,11, uint16, &(g->group_priority));
res += mx_mysql_statement_param_bind(stmt,12, string, &(g->group_disabled_servers));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
Expand Down Expand Up @@ -639,6 +649,7 @@ int main(int argc, char *argv[])
u_int16_t arg_priority;
char *arg_group_name;
u_int16_t arg_group_priority;
char *arg_disabled_servers;
char *arg_program_name;
u_int16_t arg_threads;
u_int64_t arg_memory;
Expand All @@ -659,6 +670,7 @@ int main(int argc, char *argv[])
_mx_cleanup_free_ char *arg_stdout_absolute = NULL;
_mx_cleanup_free_ char *arg_stderr_absolute = NULL;
_mx_cleanup_free_ char *arg_args = NULL;
_mx_cleanup_free_ char *disabled_servers = NULL;

int flags = 0;

Expand Down Expand Up @@ -713,6 +725,7 @@ int main(int argc, char *argv[])
MX_OPTION_OPTIONAL_ARG("mysql-default-file", 'M'),
MX_OPTION_OPTIONAL_ARG("mysql-default-group", 'S'),
MX_OPTION_REQUIRED_ARG("tmpdir", 7),
MX_OPTION_REQUIRED_ARG("disabled-servers", 8),
MX_OPTION_END
};

Expand All @@ -738,6 +751,7 @@ int main(int argc, char *argv[])
arg_jobflags = 0;
arg_groupid = UINT64_UNSET;
arg_tmpdir = 0;
arg_disabled_servers = NULL;

arg_mysql_default_group = getenv("MXQ_MYSQL_DEFAULT_GROUP");
if (!arg_mysql_default_group)
Expand Down Expand Up @@ -942,6 +956,10 @@ int main(int argc, char *argv[])
}
break;

case 8:
arg_disabled_servers = optctl.optarg;
break;

}
}

Expand Down Expand Up @@ -997,6 +1015,14 @@ int main(int argc, char *argv[])
arg_args = mx_strvec_to_str(argv);
assert(arg_args);

if ( arg_disabled_servers != NULL ) {
struct keywordset *kws = keywordset_new(arg_disabled_servers);
disabled_servers = keywordset_get(kws);
keywordset_free(kws);
} else {
disabled_servers = mx_strdup_forever("");
}

/******************************************************************/

memset(&job, 0, sizeof(job));
Expand All @@ -1013,6 +1039,7 @@ int main(int argc, char *argv[])
group.job_memory = arg_memory;
group.job_time = arg_time;
group.job_tmpdir_size = arg_tmpdir;
group.group_disabled_servers = disabled_servers;

group.job_max_per_node = arg_max_per_node;

Expand Down

0 comments on commit 337ba3c

Please sign in to comment.