diff --git a/mxqsub.c b/mxqsub.c index 7faabe1..621e9bc 100644 --- a/mxqsub.c +++ b/mxqsub.c @@ -41,6 +41,7 @@ #define MXQ_JOB_STATUS_ACTIVE (1) +#define UINT64_UNSET (uint64_t)(-1) static void print_usage(void) { @@ -75,7 +76,7 @@ static void print_usage(void) "Job handling:\n" " Define what to do if something bad happens:\n" "\n" - " -r | --restart[=MODE] restart job on system failure (default: 'never')\n" + " -r, --restart[=MODE] restart job on system failure (default: 'never')\n" "\n" " available restart [MODE]s:\n" " 'never' do not restart\n" @@ -88,6 +89,9 @@ static void print_usage(void) " amount of resources and having the same priority\n" " are grouped and executed in parallel.\n" "\n" + " -n, --new-group create new group if group has no running or queued jobs\n" + " (default: reuse group based on automatic grouping)\n" + "\n" " -a, --command-alias=NAME set command alias (default: )\n" " -N, --group-name=NAME set group name (default: 'default')\n" " -P, --group-priority=PRIORITY set group priority (default: 127)\n" @@ -101,8 +105,8 @@ static void print_usage(void) "\n" "Change how to connect to the mysql server:\n" "\n" - " -M | --mysql-default-file[=MYSQLCNF] (default: %s)\n" - " -S | --mysql-default-group[=MYSQLGROUP] (default: %s)\n" + " -M, --mysql-default-file[=MYSQLCNF] (default: %s)\n" + " -S, --mysql-default-group[=MYSQLGROUP] (default: %s)\n" "\n" "Environment:\n" " MXQ_MYSQL_DEFAULT_FILE change default for [MYSQLCNF]\n" @@ -154,7 +158,157 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g) " AND job_time = ?" " AND group_priority = ?" " AND group_status = 0" - " ORDER BY group_id " + " ORDER BY group_id DESC" + " LIMIT 1"); + if (!stmt) { + mx_log_err("mx_mysql_statement_prepare(): %m"); + return -errno; + } + + res = mx_mysql_statement_param_bind(stmt, 0, string, &(g->group_name)); + res += mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid)); + res += mx_mysql_statement_param_bind(stmt, 2, string, &(g->user_name)); + res += mx_mysql_statement_param_bind(stmt, 3, uint32, &(g->user_gid)); + res += mx_mysql_statement_param_bind(stmt, 4, string, &(g->user_group)); + res += mx_mysql_statement_param_bind(stmt, 5, string, &(g->job_command)); + res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads)); + res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory)); + res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time)); + res += mx_mysql_statement_param_bind(stmt, 9, uint16, &(g->group_priority)); + assert(res == 0); + + res = mx_mysql_statement_execute(stmt, &num_rows); + if (res < 0) { + mx_log_err("mx_mysql_statement_execute(): %m"); + mx_mysql_statement_close(&stmt); + return res; + } + assert(num_rows <= 1); + + if (num_rows) { + mx_mysql_statement_result_bind(stmt, 0, uint64, &(g->group_id)); + + res = mx_mysql_statement_fetch(stmt); + if (res < 0) { + mx_log_err("mx_mysql_statement_fetch(): %m"); + mx_mysql_statement_close(&stmt); + return res; + } + } + + mx_mysql_statement_close(&stmt); + + return (int)num_rows; +} + +static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g) +{ + struct mx_mysql_stmt *stmt = NULL; + unsigned long long num_rows = 0; + int res; + + assert(mysql); + assert(g); + assert(g->group_id > 0); + + assert(g->group_name); assert(*g->group_name); + assert(g->group_priority); + assert(g->user_uid); assert(g->user_name); assert(*g->user_name); + assert(g->user_gid); assert(g->user_group); assert(*g->user_group); + assert(g->job_command); assert(*g->job_command); + assert(g->job_threads); assert(g->job_memory); assert(g->job_time); + + stmt = mx_mysql_statement_prepare(mysql, + "SELECT" + " group_id" + " FROM mxq_group " + " WHERE group_name = ?" + " AND user_uid = ?" + " AND user_name = ?" + " AND user_gid = ?" + " AND user_group = ?" + " AND job_command = ?" + " AND job_threads = ?" + " AND job_memory = ?" + " AND job_time = ?" + " AND group_priority = ?" + " AND group_status = 0" + " AND group_id = ?" + " ORDER BY group_id DESC" + " LIMIT 1"); + if (!stmt) { + mx_log_err("mx_mysql_statement_prepare(): %m"); + return -errno; + } + + res = mx_mysql_statement_param_bind(stmt, 0, string, &(g->group_name)); + res += mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid)); + res += mx_mysql_statement_param_bind(stmt, 2, string, &(g->user_name)); + res += mx_mysql_statement_param_bind(stmt, 3, uint32, &(g->user_gid)); + res += mx_mysql_statement_param_bind(stmt, 4, string, &(g->user_group)); + res += mx_mysql_statement_param_bind(stmt, 5, string, &(g->job_command)); + res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads)); + res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory)); + res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time)); + res += mx_mysql_statement_param_bind(stmt, 9, uint16, &(g->group_priority)); + res += mx_mysql_statement_param_bind(stmt, 10, uint64, &(g->group_id)); + assert(res == 0); + + res = mx_mysql_statement_execute(stmt, &num_rows); + if (res < 0) { + mx_log_err("mx_mysql_statement_execute(): %m"); + mx_mysql_statement_close(&stmt); + return res; + } + assert(num_rows <= 1); + + if (!num_rows) { + g->group_id = 0; + } + + mx_mysql_statement_close(&stmt); + + return (int)num_rows; +} + +static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g) +{ + struct mx_mysql_stmt *stmt = NULL; + unsigned long long num_rows = 0; + int res; + + assert(mysql); + assert(g); + assert(g->group_id == 0); + + assert(g->group_name); assert(*g->group_name); + assert(g->group_priority); + assert(g->user_uid); assert(g->user_name); assert(*g->user_name); + assert(g->user_gid); assert(g->user_group); assert(*g->user_group); + assert(g->job_command); assert(*g->job_command); + assert(g->job_threads); assert(g->job_memory); assert(g->job_time); + + stmt = mx_mysql_statement_prepare(mysql, + "SELECT" + " group_id" + " FROM mxq_group " + " WHERE group_name = ?" + " AND user_uid = ?" + " AND user_name = ?" + " AND user_gid = ?" + " AND user_group = ?" + " AND job_command = ?" + " AND job_threads = ?" + " AND job_memory = ?" + " AND job_time = ?" + " AND group_priority = ?" + " AND group_status = 0" + " AND (" + "group_jobs_running > 0" + " OR group_jobs_inq > 0" + " OR group_jobs = 0" + ")" + " ORDER BY group_id DESC" " LIMIT 1"); if (!stmt) { mx_log_err("mx_mysql_statement_prepare(): %m"); @@ -332,14 +486,26 @@ static int add_job(struct mx_mysql *mysql, struct mxq_job *j) return (int)num_rows; } -static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, int flags) +static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, int flags, uint64_t group_id) { int res; struct mxq_group *g; g = j->group_ptr; - res = load_group_id(mysql, g); + if (group_id == UINT64_UNSET) { + res = load_group_id(mysql, g); + } else if (group_id == 0) { + res = load_group_id_run_or_wait(mysql, g); + } else { + g->group_id = group_id; + res = load_group_id_by_group_id(mysql, g); + if (res == 0) { + mx_log_crit("Could not load group with group_id=%lu: No matching group found. Aborting.", group_id); + return -(errno=ENOENT); + } + } + if (res < 0) return res; @@ -393,6 +559,7 @@ int main(int argc, char *argv[]) u_int16_t arg_threads; u_int64_t arg_memory; u_int32_t arg_time; + u_int64_t arg_groupid; char *arg_workdir; char *arg_stdout; char *arg_stderr; @@ -434,6 +601,10 @@ int main(int argc, char *argv[]) MX_OPTION_OPTIONAL_ARG("restartable", 'r'), + MX_OPTION_NO_ARG("new-group", 'n'), + + MX_OPTION_REQUIRED_ARG("group-id", 'g'), + MX_OPTION_REQUIRED_ARG("group-name", 'N'), MX_OPTION_REQUIRED_ARG("group-priority", 'P'), @@ -456,7 +627,6 @@ int main(int argc, char *argv[]) MX_OPTION_END }; - /******************************************************************/ current_workdir = get_current_dir_name(); @@ -476,6 +646,7 @@ int main(int argc, char *argv[]) arg_umask = getumask(); arg_debug = 0; arg_jobflags = 0; + arg_groupid = UINT64_UNSET; arg_mysql_default_group = getenv("MXQ_MYSQL_DEFAULT_GROUP"); if (!arg_mysql_default_group) @@ -514,6 +685,25 @@ int main(int argc, char *argv[]) mx_log_level_set(MX_LOG_INFO); break; + case 'g': + if (arg_groupid == 0) { + mx_log_crit("--group-id: invalid use after --new-group."); + exit(EX_CONFIG); + } + if (mx_strtou64(optctl.optarg, &arg_groupid) < 0) { + mx_log_crit("--group-id '%s': %m", optctl.optarg); + exit(EX_CONFIG); + } + break; + + case 'n': + if (arg_groupid != UINT64_UNSET && arg_groupid != 0) { + mx_log_crit("--new-group: invalid use. group-id already set to %lu.", arg_groupid); + exit(EX_CONFIG); + } + arg_groupid = 0; + break; + case 'r': if (!optctl.optarg || mx_streq(optctl.optarg, "always")) { arg_jobflags |= MXQ_JOB_FLAGS_RESTART_ON_HOSTFAIL; @@ -757,7 +947,7 @@ int main(int argc, char *argv[]) mx_log_info("MySQL: Connection to database established."); - res = mxq_submit_task(mysql, &job, flags); + res = mxq_submit_task(mysql, &job, flags, arg_groupid); mx_mysql_finish(&mysql);