From 405bfcba0b833c99e77af08f4ef1a48efbedd72f Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Sat, 18 Apr 2020 18:40:10 +0200 Subject: [PATCH] mxqsub: Add --prerequisites If the users specifies a prerequisites expression with --prerequisites, parse it to verify its syntax and store it as a group attribute. --- mxqsub.c | 48 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/mxqsub.c b/mxqsub.c index 0bac3b82..065fd02d 100644 --- a/mxqsub.c +++ b/mxqsub.c @@ -33,7 +33,7 @@ #include "mx_getopt.h" #include "mx_mysql.h" #include "keywordset.h" - +#include "parser.tab.h" #include "mxq.h" #define MXQ_TASK_JOB_FORCE_APPEND (1<<0) @@ -75,6 +75,7 @@ static void print_usage(void) " --tmpdir=SIZE set size of MXQ_JOB_TMPDIR (default: 0)\n" " --blacklist=STRING set list of blacklisted servers (default: '')\n" " --whitelist=STRING set list of whitelisted servers (default: '')\n" + " --prerequisites=STRING set prerequisites (default: '')\n" "\n" " [SIZE] may be suffixed with a combination of T, G and M\n" " to specify tebibytes, gibibytes and mebibytes.\n" @@ -173,6 +174,7 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g) " AND group_priority = ?" " AND group_blacklist = ?" " AND group_whitelist = ?" + " AND prerequisites = ?" " AND group_status = 0" " AND group_flags & ? = 0 " " ORDER BY group_id DESC" @@ -196,7 +198,8 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g) res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority)); res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_blacklist)); res += mx_mysql_statement_param_bind(stmt, 13, string, &(g->group_whitelist)); - res += mx_mysql_statement_param_bind(stmt, 14, uint64, &(flags)); + res += mx_mysql_statement_param_bind(stmt, 14, string, &(g->prerequisites)); + res += mx_mysql_statement_param_bind(stmt, 15, uint64, &(flags)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); @@ -262,6 +265,7 @@ static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g " AND group_priority = ?" " AND group_blacklist = ?" " AND group_whitelist = ?" + " AND prerequisites = ?" " AND group_status = 0" " AND group_id = ?" " AND group_flags & ? = 0 " @@ -286,8 +290,9 @@ static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority)); res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_blacklist)); res += mx_mysql_statement_param_bind(stmt, 13, string, &(g->group_whitelist)); - res += mx_mysql_statement_param_bind(stmt, 14, uint64, &(g->group_id)); - res += mx_mysql_statement_param_bind(stmt, 15, uint64, &(flags)); + res += mx_mysql_statement_param_bind(stmt, 14, string, &(g->prerequisites)); + res += mx_mysql_statement_param_bind(stmt, 15, uint64, &(g->group_id)); + res += mx_mysql_statement_param_bind(stmt, 16, uint64, &(flags)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); @@ -345,6 +350,7 @@ static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g " AND group_priority = ?" " AND group_blacklist = ?" " AND group_whitelist = ?" + " AND prerequisites = ?" " AND group_status = 0" " AND (" "group_jobs_running > 0" @@ -373,7 +379,8 @@ static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority)); res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_blacklist)); res += mx_mysql_statement_param_bind(stmt, 13, string, &(g->group_whitelist)); - res += mx_mysql_statement_param_bind(stmt, 14, uint64, &(flags)); + res += mx_mysql_statement_param_bind(stmt, 14, string, &(g->prerequisites)); + res += mx_mysql_statement_param_bind(stmt, 15, uint64, &(flags)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); @@ -432,7 +439,8 @@ static int add_group(struct mx_mysql *mysql, struct mxq_group *g) " job_max_per_node = ?," " group_priority = ?," " group_blacklist = ?," - " group_whitelist = ?"); + " group_whitelist = ?," + " prerequisites = ?"); if (!stmt) { mx_log_err("mx_mysql_statement_prepare(): %m"); return -errno; @@ -452,6 +460,7 @@ static int add_group(struct mx_mysql *mysql, struct mxq_group *g) res += mx_mysql_statement_param_bind(stmt,11, uint16, &(g->group_priority)); res += mx_mysql_statement_param_bind(stmt,12, string, &(g->group_blacklist)); res += mx_mysql_statement_param_bind(stmt,13, string, &(g->group_whitelist)); + res += mx_mysql_statement_param_bind(stmt,14, string, &(g->prerequisites)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); @@ -648,6 +657,21 @@ static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, int flags, return res; } +static void verify_expression(char *expr) { + struct keywordset *tags=keywordset_new(NULL); + struct parser_context parser_context = { + .input = expr, + .tags = tags, + .pos=0, + .result = 0, + }; + if (yyparse(&parser_context)) { + fprintf(stderr,"syntax error in prerequisites expression \"%s\"\n", expr); + exit(1); + } + keywordset_free(tags); +} + int main(int argc, char *argv[]) { int i; @@ -660,6 +684,7 @@ int main(int argc, char *argv[]) u_int16_t arg_group_priority; char *arg_blacklist; char *arg_whitelist; + char *arg_prerequisites; char *arg_program_name; u_int16_t arg_threads; u_int64_t arg_memory; @@ -738,6 +763,7 @@ int main(int argc, char *argv[]) MX_OPTION_REQUIRED_ARG("tmpdir", 7), MX_OPTION_REQUIRED_ARG("blacklist", 8), MX_OPTION_REQUIRED_ARG("whitelist", 9), + MX_OPTION_REQUIRED_ARG("prerequisites", 10), MX_OPTION_END }; @@ -765,6 +791,7 @@ int main(int argc, char *argv[]) arg_tmpdir = 0; arg_blacklist = NULL; arg_whitelist = NULL; + arg_prerequisites = ""; arg_mysql_default_group = getenv("MXQ_MYSQL_DEFAULT_GROUP"); if (!arg_mysql_default_group) @@ -977,6 +1004,10 @@ int main(int argc, char *argv[]) arg_whitelist = optctl.optarg; break; + case 10: + arg_prerequisites = optctl.optarg; + break; + } } @@ -986,6 +1017,9 @@ int main(int argc, char *argv[]) exit(EX_USAGE); } + if (*arg_prerequisites != 0) + verify_expression(arg_prerequisites); + /* from this point values in argc,argv are the ones of the cluster job */ if (!arg_program_name) { @@ -1048,7 +1082,6 @@ int main(int argc, char *argv[]) whitelist = mx_strdup_forever(""); } - /******************************************************************/ memset(&job, 0, sizeof(job)); @@ -1067,6 +1100,7 @@ int main(int argc, char *argv[]) group.job_tmpdir_size = arg_tmpdir; group.group_blacklist = blacklist; group.group_whitelist = whitelist; + group.prerequisites = arg_prerequisites; group.job_max_per_node = arg_max_per_node;