Skip to content

Commit

Permalink
mxqsub: Add options --blacklist and --whitelist
Browse files Browse the repository at this point in the history
Add a new options so that specific mxqd servers can be excluded from starting
jobs for this group.

    mxqsub --whitelist "acedia avaritia" sleep 10   // start only on these
    mxqsub --blacklist dontpanic sleep 10 // start on any node but this one

Blacklist has priority, so

    mxqsub --white "kronos, uselessbox" --black kronos

would not start on kronos. The lists of blacklisted and whitelisted
servers can be modified with mxqset.
  • Loading branch information
donald committed Apr 15, 2020
1 parent 87f136e commit 2533ab5
Showing 1 changed file with 60 additions and 6 deletions.
66 changes: 60 additions & 6 deletions mxqsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "mx_util.h"
#include "mx_getopt.h"
#include "mx_mysql.h"
#include "keywordset.h"

#include "mxq.h"

Expand Down Expand Up @@ -71,7 +72,9 @@ static void print_usage(void)
"\n"
" -j, --threads=NUMBER set number of threads (default: 1)\n"
" -m, --memory=SIZE set amount of memory (default: 2G)\n"
" --tmpdir=SIZE set size of MXQ_JOB_TMPDIR (default: 0)\n"
" --tmpdir=SIZE set size of MXQ_JOB_TMPDIR (default: 0)\n"
" --blacklist=STRING set list of blacklisted servers (default: '')\n"
" --whitelist=STRING set list of whitelisted servers (default: '')\n"
"\n"
" [SIZE] may be suffixed with a combination of T, G and M\n"
" to specify tebibytes, gibibytes and mebibytes.\n"
Expand Down Expand Up @@ -168,6 +171,8 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g)
" AND job_tmpdir_size = ?"
" AND job_max_per_node = ?"
" AND group_priority = ?"
" AND group_blacklist = ?"
" AND group_whitelist = ?"
" AND group_status = 0"
" AND group_flags & ? = 0 "
" ORDER BY group_id DESC"
Expand All @@ -189,7 +194,9 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g)
res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size));
res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->job_max_per_node));
res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority));
res += mx_mysql_statement_param_bind(stmt, 12, uint64, &(flags));
res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_blacklist));
res += mx_mysql_statement_param_bind(stmt, 13, string, &(g->group_whitelist));
res += mx_mysql_statement_param_bind(stmt, 14, uint64, &(flags));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
Expand Down Expand Up @@ -253,6 +260,8 @@ static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g
" AND job_tmpdir_size = ?"
" AND job_max_per_node = ?"
" AND group_priority = ?"
" AND group_blacklist = ?"
" AND group_whitelist = ?"
" AND group_status = 0"
" AND group_id = ?"
" AND group_flags & ? = 0 "
Expand All @@ -275,8 +284,10 @@ static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g
res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size));
res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->job_max_per_node));
res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority));
res += mx_mysql_statement_param_bind(stmt, 12, uint64, &(g->group_id));
res += mx_mysql_statement_param_bind(stmt, 13, uint64, &(flags));
res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_blacklist));
res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_whitelist));
res += mx_mysql_statement_param_bind(stmt, 14, uint64, &(g->group_id));
res += mx_mysql_statement_param_bind(stmt, 15, uint64, &(flags));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
Expand Down Expand Up @@ -332,6 +343,8 @@ static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g
" AND job_tmpdir_size = ?"
" AND job_max_per_node = ?"
" AND group_priority = ?"
" AND group_blacklist = ?"
" AND group_whitelist = ?"
" AND group_status = 0"
" AND ("
"group_jobs_running > 0"
Expand All @@ -358,7 +371,9 @@ static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g
res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size));
res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->job_max_per_node));
res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority));
res += mx_mysql_statement_param_bind(stmt, 12, uint64, &(flags));
res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_blacklist));
res += mx_mysql_statement_param_bind(stmt, 13, string, &(g->group_whitelist));
res += mx_mysql_statement_param_bind(stmt, 14, uint64, &(flags));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
Expand Down Expand Up @@ -415,7 +430,9 @@ static int add_group(struct mx_mysql *mysql, struct mxq_group *g)
" job_time = ?,"
" job_tmpdir_size = ?,"
" job_max_per_node = ?,"
" group_priority = ?");
" group_priority = ?,"
" group_blacklist = ?,"
" group_whitelist = ?");
if (!stmt) {
mx_log_err("mx_mysql_statement_prepare(): %m");
return -errno;
Expand All @@ -433,6 +450,8 @@ static int add_group(struct mx_mysql *mysql, struct mxq_group *g)
res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size));
res += mx_mysql_statement_param_bind(stmt,10, uint16, &(g->job_max_per_node));
res += mx_mysql_statement_param_bind(stmt,11, uint16, &(g->group_priority));
res += mx_mysql_statement_param_bind(stmt,12, string, &(g->group_blacklist));
res += mx_mysql_statement_param_bind(stmt,13, string, &(g->group_whitelist));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
Expand Down Expand Up @@ -639,6 +658,8 @@ int main(int argc, char *argv[])
u_int16_t arg_priority;
char *arg_group_name;
u_int16_t arg_group_priority;
char *arg_blacklist;
char *arg_whitelist;
char *arg_program_name;
u_int16_t arg_threads;
u_int64_t arg_memory;
Expand All @@ -659,6 +680,8 @@ int main(int argc, char *argv[])
_mx_cleanup_free_ char *arg_stdout_absolute = NULL;
_mx_cleanup_free_ char *arg_stderr_absolute = NULL;
_mx_cleanup_free_ char *arg_args = NULL;
_mx_cleanup_free_ char *blacklist = NULL;
_mx_cleanup_free_ char *whitelist = NULL;

int flags = 0;

Expand Down Expand Up @@ -713,6 +736,8 @@ int main(int argc, char *argv[])
MX_OPTION_OPTIONAL_ARG("mysql-default-file", 'M'),
MX_OPTION_OPTIONAL_ARG("mysql-default-group", 'S'),
MX_OPTION_REQUIRED_ARG("tmpdir", 7),
MX_OPTION_REQUIRED_ARG("blacklist", 8),
MX_OPTION_REQUIRED_ARG("whitelist", 9),
MX_OPTION_END
};

Expand All @@ -738,6 +763,8 @@ int main(int argc, char *argv[])
arg_jobflags = 0;
arg_groupid = UINT64_UNSET;
arg_tmpdir = 0;
arg_blacklist = NULL;
arg_whitelist = NULL;

arg_mysql_default_group = getenv("MXQ_MYSQL_DEFAULT_GROUP");
if (!arg_mysql_default_group)
Expand Down Expand Up @@ -942,6 +969,14 @@ int main(int argc, char *argv[])
}
break;

case 8:
arg_blacklist = optctl.optarg;
break;

case 9:
arg_whitelist = optctl.optarg;
break;

}
}

Expand Down Expand Up @@ -997,6 +1032,23 @@ int main(int argc, char *argv[])
arg_args = mx_strvec_to_str(argv);
assert(arg_args);

if ( arg_blacklist != NULL ) {
struct keywordset *kws = keywordset_new(arg_blacklist);
blacklist = keywordset_get(kws);
keywordset_free(kws);
} else {
blacklist = mx_strdup_forever("");
}

if ( arg_whitelist != NULL ) {
struct keywordset *kws = keywordset_new(arg_whitelist);
whitelist = keywordset_get(kws);
keywordset_free(kws);
} else {
whitelist = mx_strdup_forever("");
}


/******************************************************************/

memset(&job, 0, sizeof(job));
Expand All @@ -1013,6 +1065,8 @@ int main(int argc, char *argv[])
group.job_memory = arg_memory;
group.job_time = arg_time;
group.job_tmpdir_size = arg_tmpdir;
group.group_blacklist = blacklist;
group.group_whitelist = whitelist;

group.job_max_per_node = arg_max_per_node;

Expand Down

0 comments on commit 2533ab5

Please sign in to comment.