Skip to content

Commit

Permalink
mxqsub: Add --prerequisites
Browse files Browse the repository at this point in the history
If the users specifies a prerequisites expression with --prerequisites,
parse it to verify its syntax and store it as a group attribute.
  • Loading branch information
donald committed Apr 19, 2020
1 parent 190d7d2 commit 405bfcb
Showing 1 changed file with 41 additions and 7 deletions.
48 changes: 41 additions & 7 deletions mxqsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#include "mx_getopt.h"
#include "mx_mysql.h"
#include "keywordset.h"

#include "parser.tab.h"
#include "mxq.h"

#define MXQ_TASK_JOB_FORCE_APPEND (1<<0)
Expand Down Expand Up @@ -75,6 +75,7 @@ static void print_usage(void)
" --tmpdir=SIZE set size of MXQ_JOB_TMPDIR (default: 0)\n"
" --blacklist=STRING set list of blacklisted servers (default: '')\n"
" --whitelist=STRING set list of whitelisted servers (default: '')\n"
" --prerequisites=STRING set prerequisites (default: '')\n"
"\n"
" [SIZE] may be suffixed with a combination of T, G and M\n"
" to specify tebibytes, gibibytes and mebibytes.\n"
Expand Down Expand Up @@ -173,6 +174,7 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g)
" AND group_priority = ?"
" AND group_blacklist = ?"
" AND group_whitelist = ?"
" AND prerequisites = ?"
" AND group_status = 0"
" AND group_flags & ? = 0 "
" ORDER BY group_id DESC"
Expand All @@ -196,7 +198,8 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g)
res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority));
res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_blacklist));
res += mx_mysql_statement_param_bind(stmt, 13, string, &(g->group_whitelist));
res += mx_mysql_statement_param_bind(stmt, 14, uint64, &(flags));
res += mx_mysql_statement_param_bind(stmt, 14, string, &(g->prerequisites));
res += mx_mysql_statement_param_bind(stmt, 15, uint64, &(flags));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
Expand Down Expand Up @@ -262,6 +265,7 @@ static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g
" AND group_priority = ?"
" AND group_blacklist = ?"
" AND group_whitelist = ?"
" AND prerequisites = ?"
" AND group_status = 0"
" AND group_id = ?"
" AND group_flags & ? = 0 "
Expand All @@ -286,8 +290,9 @@ static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g
res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority));
res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_blacklist));
res += mx_mysql_statement_param_bind(stmt, 13, string, &(g->group_whitelist));
res += mx_mysql_statement_param_bind(stmt, 14, uint64, &(g->group_id));
res += mx_mysql_statement_param_bind(stmt, 15, uint64, &(flags));
res += mx_mysql_statement_param_bind(stmt, 14, string, &(g->prerequisites));
res += mx_mysql_statement_param_bind(stmt, 15, uint64, &(g->group_id));
res += mx_mysql_statement_param_bind(stmt, 16, uint64, &(flags));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
Expand Down Expand Up @@ -345,6 +350,7 @@ static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g
" AND group_priority = ?"
" AND group_blacklist = ?"
" AND group_whitelist = ?"
" AND prerequisites = ?"
" AND group_status = 0"
" AND ("
"group_jobs_running > 0"
Expand Down Expand Up @@ -373,7 +379,8 @@ static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g
res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority));
res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_blacklist));
res += mx_mysql_statement_param_bind(stmt, 13, string, &(g->group_whitelist));
res += mx_mysql_statement_param_bind(stmt, 14, uint64, &(flags));
res += mx_mysql_statement_param_bind(stmt, 14, string, &(g->prerequisites));
res += mx_mysql_statement_param_bind(stmt, 15, uint64, &(flags));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
Expand Down Expand Up @@ -432,7 +439,8 @@ static int add_group(struct mx_mysql *mysql, struct mxq_group *g)
" job_max_per_node = ?,"
" group_priority = ?,"
" group_blacklist = ?,"
" group_whitelist = ?");
" group_whitelist = ?,"
" prerequisites = ?");
if (!stmt) {
mx_log_err("mx_mysql_statement_prepare(): %m");
return -errno;
Expand All @@ -452,6 +460,7 @@ static int add_group(struct mx_mysql *mysql, struct mxq_group *g)
res += mx_mysql_statement_param_bind(stmt,11, uint16, &(g->group_priority));
res += mx_mysql_statement_param_bind(stmt,12, string, &(g->group_blacklist));
res += mx_mysql_statement_param_bind(stmt,13, string, &(g->group_whitelist));
res += mx_mysql_statement_param_bind(stmt,14, string, &(g->prerequisites));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
Expand Down Expand Up @@ -648,6 +657,21 @@ static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, int flags,
return res;
}

static void verify_expression(char *expr) {
struct keywordset *tags=keywordset_new(NULL);
struct parser_context parser_context = {
.input = expr,
.tags = tags,
.pos=0,
.result = 0,
};
if (yyparse(&parser_context)) {
fprintf(stderr,"syntax error in prerequisites expression \"%s\"\n", expr);
exit(1);
}
keywordset_free(tags);
}

int main(int argc, char *argv[])
{
int i;
Expand All @@ -660,6 +684,7 @@ int main(int argc, char *argv[])
u_int16_t arg_group_priority;
char *arg_blacklist;
char *arg_whitelist;
char *arg_prerequisites;
char *arg_program_name;
u_int16_t arg_threads;
u_int64_t arg_memory;
Expand Down Expand Up @@ -738,6 +763,7 @@ int main(int argc, char *argv[])
MX_OPTION_REQUIRED_ARG("tmpdir", 7),
MX_OPTION_REQUIRED_ARG("blacklist", 8),
MX_OPTION_REQUIRED_ARG("whitelist", 9),
MX_OPTION_REQUIRED_ARG("prerequisites", 10),
MX_OPTION_END
};

Expand Down Expand Up @@ -765,6 +791,7 @@ int main(int argc, char *argv[])
arg_tmpdir = 0;
arg_blacklist = NULL;
arg_whitelist = NULL;
arg_prerequisites = "";

arg_mysql_default_group = getenv("MXQ_MYSQL_DEFAULT_GROUP");
if (!arg_mysql_default_group)
Expand Down Expand Up @@ -977,6 +1004,10 @@ int main(int argc, char *argv[])
arg_whitelist = optctl.optarg;
break;

case 10:
arg_prerequisites = optctl.optarg;
break;

}
}

Expand All @@ -986,6 +1017,9 @@ int main(int argc, char *argv[])
exit(EX_USAGE);
}

if (*arg_prerequisites != 0)
verify_expression(arg_prerequisites);

/* from this point values in argc,argv are the ones of the cluster job */

if (!arg_program_name) {
Expand Down Expand Up @@ -1048,7 +1082,6 @@ int main(int argc, char *argv[])
whitelist = mx_strdup_forever("");
}


/******************************************************************/

memset(&job, 0, sizeof(job));
Expand All @@ -1067,6 +1100,7 @@ int main(int argc, char *argv[])
group.job_tmpdir_size = arg_tmpdir;
group.group_blacklist = blacklist;
group.group_whitelist = whitelist;
group.prerequisites = arg_prerequisites;

group.job_max_per_node = arg_max_per_node;

Expand Down

0 comments on commit 405bfcb

Please sign in to comment.