From 1cd234959da2b40c702ed510f774615551fec5e5 Mon Sep 17 00:00:00 2001 From: Marius Tolzmann Date: Tue, 25 Aug 2015 16:16:58 +0200 Subject: [PATCH 1/4] mxqsub: Fix typos in usage (--help) --- mxqsub.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mxqsub.c b/mxqsub.c index 7faabe1..db02ea2 100644 --- a/mxqsub.c +++ b/mxqsub.c @@ -75,7 +75,7 @@ static void print_usage(void) "Job handling:\n" " Define what to do if something bad happens:\n" "\n" - " -r | --restart[=MODE] restart job on system failure (default: 'never')\n" + " -r, --restart[=MODE] restart job on system failure (default: 'never')\n" "\n" " available restart [MODE]s:\n" " 'never' do not restart\n" @@ -101,8 +101,8 @@ static void print_usage(void) "\n" "Change how to connect to the mysql server:\n" "\n" - " -M | --mysql-default-file[=MYSQLCNF] (default: %s)\n" - " -S | --mysql-default-group[=MYSQLGROUP] (default: %s)\n" + " -M, --mysql-default-file[=MYSQLCNF] (default: %s)\n" + " -S, --mysql-default-group[=MYSQLGROUP] (default: %s)\n" "\n" "Environment:\n" " MXQ_MYSQL_DEFAULT_FILE change default for [MYSQLCNF]\n" From 79fcd63d43c8570de5a2ca3768018454afac9a75 Mon Sep 17 00:00:00 2001 From: Marius Tolzmann Date: Tue, 25 Aug 2015 17:42:35 +0200 Subject: [PATCH 2/4] mxqsub: Add option --new-group to froce new group this is part of https://github.molgen.mpg.de/mariux64/mxq/issues/22 --- mxqsub.c | 110 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 106 insertions(+), 4 deletions(-) diff --git a/mxqsub.c b/mxqsub.c index db02ea2..26301b0 100644 --- a/mxqsub.c +++ b/mxqsub.c @@ -41,6 +41,7 @@ #define MXQ_JOB_STATUS_ACTIVE (1) +#define UINT64_UNSET (uint64_t)(-1) static void print_usage(void) { @@ -88,6 +89,9 @@ static void print_usage(void) " amount of resources and having the same priority\n" " are grouped and executed in parallel.\n" "\n" + " -n, --new-group create new group if group has no running or queued jobs\n" + " (default: reuse group based on automatic grouping)\n" + "\n" " -a, --command-alias=NAME set command alias (default: )\n" " -N, --group-name=NAME set group name (default: 'default')\n" " -P, --group-priority=PRIORITY set group priority (default: 127)\n" @@ -197,6 +201,86 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g) return (int)num_rows; } +static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g) +{ + struct mx_mysql_stmt *stmt = NULL; + unsigned long long num_rows = 0; + int res; + + assert(mysql); + assert(g); + assert(g->group_id == 0); + + assert(g->group_name); assert(*g->group_name); + assert(g->group_priority); + assert(g->user_uid); assert(g->user_name); assert(*g->user_name); + assert(g->user_gid); assert(g->user_group); assert(*g->user_group); + assert(g->job_command); assert(*g->job_command); + assert(g->job_threads); assert(g->job_memory); assert(g->job_time); + + stmt = mx_mysql_statement_prepare(mysql, + "SELECT" + " group_id" + " FROM mxq_group " + " WHERE group_name = ?" + " AND user_uid = ?" + " AND user_name = ?" + " AND user_gid = ?" + " AND user_group = ?" + " AND job_command = ?" + " AND job_threads = ?" + " AND job_memory = ?" + " AND job_time = ?" + " AND group_priority = ?" + " AND group_status = 0" + " AND (" + "group_jobs_running > 0" + " OR group_jobs_inq > 0" + " OR group_jobs = 0" + ")" + " ORDER BY group_id DESC" + " LIMIT 1"); + if (!stmt) { + mx_log_err("mx_mysql_statement_prepare(): %m"); + return -errno; + } + + res = mx_mysql_statement_param_bind(stmt, 0, string, &(g->group_name)); + res += mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid)); + res += mx_mysql_statement_param_bind(stmt, 2, string, &(g->user_name)); + res += mx_mysql_statement_param_bind(stmt, 3, uint32, &(g->user_gid)); + res += mx_mysql_statement_param_bind(stmt, 4, string, &(g->user_group)); + res += mx_mysql_statement_param_bind(stmt, 5, string, &(g->job_command)); + res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads)); + res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory)); + res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time)); + res += mx_mysql_statement_param_bind(stmt, 9, uint16, &(g->group_priority)); + assert(res == 0); + + res = mx_mysql_statement_execute(stmt, &num_rows); + if (res < 0) { + mx_log_err("mx_mysql_statement_execute(): %m"); + mx_mysql_statement_close(&stmt); + return res; + } + assert(num_rows <= 1); + + if (num_rows) { + mx_mysql_statement_result_bind(stmt, 0, uint64, &(g->group_id)); + + res = mx_mysql_statement_fetch(stmt); + if (res < 0) { + mx_log_err("mx_mysql_statement_fetch(): %m"); + mx_mysql_statement_close(&stmt); + return res; + } + } + + mx_mysql_statement_close(&stmt); + + return (int)num_rows; +} + static int add_group(struct mx_mysql *mysql, struct mxq_group *g) { struct mx_mysql_stmt *stmt = NULL; @@ -332,14 +416,21 @@ static int add_job(struct mx_mysql *mysql, struct mxq_job *j) return (int)num_rows; } -static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, int flags) +static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, int flags, uint64_t group_id) { int res; struct mxq_group *g; g = j->group_ptr; - res = load_group_id(mysql, g); + if (group_id == UINT64_UNSET) { + res = load_group_id(mysql, g); + } else if (group_id == 0) { + res = load_group_id_run_or_wait(mysql, g); + } else { + assert(0); + } + if (res < 0) return res; @@ -393,6 +484,7 @@ int main(int argc, char *argv[]) u_int16_t arg_threads; u_int64_t arg_memory; u_int32_t arg_time; + u_int64_t arg_groupid; char *arg_workdir; char *arg_stdout; char *arg_stderr; @@ -434,6 +526,8 @@ int main(int argc, char *argv[]) MX_OPTION_OPTIONAL_ARG("restartable", 'r'), + MX_OPTION_NO_ARG("new-group", 'n'), + MX_OPTION_REQUIRED_ARG("group-name", 'N'), MX_OPTION_REQUIRED_ARG("group-priority", 'P'), @@ -456,7 +550,6 @@ int main(int argc, char *argv[]) MX_OPTION_END }; - /******************************************************************/ current_workdir = get_current_dir_name(); @@ -476,6 +569,7 @@ int main(int argc, char *argv[]) arg_umask = getumask(); arg_debug = 0; arg_jobflags = 0; + arg_groupid = UINT64_UNSET; arg_mysql_default_group = getenv("MXQ_MYSQL_DEFAULT_GROUP"); if (!arg_mysql_default_group) @@ -514,6 +608,14 @@ int main(int argc, char *argv[]) mx_log_level_set(MX_LOG_INFO); break; + case 'n': + if (arg_groupid != UINT64_UNSET && arg_groupid != 0) { + mx_log_crit("--new-group: invalid use. group-id already set to %lu.", arg_groupid); + exit(EX_CONFIG); + } + arg_groupid = 0; + break; + case 'r': if (!optctl.optarg || mx_streq(optctl.optarg, "always")) { arg_jobflags |= MXQ_JOB_FLAGS_RESTART_ON_HOSTFAIL; @@ -757,7 +859,7 @@ int main(int argc, char *argv[]) mx_log_info("MySQL: Connection to database established."); - res = mxq_submit_task(mysql, &job, flags); + res = mxq_submit_task(mysql, &job, flags, arg_groupid); mx_mysql_finish(&mysql); From 296af7d6d594dea1a67853af8abf7ee30e182e4f Mon Sep 17 00:00:00 2001 From: Marius Tolzmann Date: Wed, 26 Aug 2015 11:13:22 +0200 Subject: [PATCH 3/4] mxqsub: Add jobs to the group with greatest group_id when reusing groups --- mxqsub.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mxqsub.c b/mxqsub.c index 26301b0..ee0aa59 100644 --- a/mxqsub.c +++ b/mxqsub.c @@ -158,7 +158,7 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g) " AND job_time = ?" " AND group_priority = ?" " AND group_status = 0" - " ORDER BY group_id " + " ORDER BY group_id DESC" " LIMIT 1"); if (!stmt) { mx_log_err("mx_mysql_statement_prepare(): %m"); From a6491d86f614f23c61849c3c5cce00e1479a1eea Mon Sep 17 00:00:00 2001 From: Marius Tolzmann Date: Wed, 26 Aug 2015 13:37:30 +0200 Subject: [PATCH 4/4] mxqsub: Add option --group-id=ID to force reusing group with group_id=ID this is part of https://github.molgen.mpg.de/mariux64/mxq/issues/22 --- mxqsub.c | 90 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/mxqsub.c b/mxqsub.c index ee0aa59..621e9bc 100644 --- a/mxqsub.c +++ b/mxqsub.c @@ -201,6 +201,76 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g) return (int)num_rows; } +static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g) +{ + struct mx_mysql_stmt *stmt = NULL; + unsigned long long num_rows = 0; + int res; + + assert(mysql); + assert(g); + assert(g->group_id > 0); + + assert(g->group_name); assert(*g->group_name); + assert(g->group_priority); + assert(g->user_uid); assert(g->user_name); assert(*g->user_name); + assert(g->user_gid); assert(g->user_group); assert(*g->user_group); + assert(g->job_command); assert(*g->job_command); + assert(g->job_threads); assert(g->job_memory); assert(g->job_time); + + stmt = mx_mysql_statement_prepare(mysql, + "SELECT" + " group_id" + " FROM mxq_group " + " WHERE group_name = ?" + " AND user_uid = ?" + " AND user_name = ?" + " AND user_gid = ?" + " AND user_group = ?" + " AND job_command = ?" + " AND job_threads = ?" + " AND job_memory = ?" + " AND job_time = ?" + " AND group_priority = ?" + " AND group_status = 0" + " AND group_id = ?" + " ORDER BY group_id DESC" + " LIMIT 1"); + if (!stmt) { + mx_log_err("mx_mysql_statement_prepare(): %m"); + return -errno; + } + + res = mx_mysql_statement_param_bind(stmt, 0, string, &(g->group_name)); + res += mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid)); + res += mx_mysql_statement_param_bind(stmt, 2, string, &(g->user_name)); + res += mx_mysql_statement_param_bind(stmt, 3, uint32, &(g->user_gid)); + res += mx_mysql_statement_param_bind(stmt, 4, string, &(g->user_group)); + res += mx_mysql_statement_param_bind(stmt, 5, string, &(g->job_command)); + res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads)); + res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory)); + res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time)); + res += mx_mysql_statement_param_bind(stmt, 9, uint16, &(g->group_priority)); + res += mx_mysql_statement_param_bind(stmt, 10, uint64, &(g->group_id)); + assert(res == 0); + + res = mx_mysql_statement_execute(stmt, &num_rows); + if (res < 0) { + mx_log_err("mx_mysql_statement_execute(): %m"); + mx_mysql_statement_close(&stmt); + return res; + } + assert(num_rows <= 1); + + if (!num_rows) { + g->group_id = 0; + } + + mx_mysql_statement_close(&stmt); + + return (int)num_rows; +} + static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g) { struct mx_mysql_stmt *stmt = NULL; @@ -428,7 +498,12 @@ static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, int flags, } else if (group_id == 0) { res = load_group_id_run_or_wait(mysql, g); } else { - assert(0); + g->group_id = group_id; + res = load_group_id_by_group_id(mysql, g); + if (res == 0) { + mx_log_crit("Could not load group with group_id=%lu: No matching group found. Aborting.", group_id); + return -(errno=ENOENT); + } } if (res < 0) @@ -528,6 +603,8 @@ int main(int argc, char *argv[]) MX_OPTION_NO_ARG("new-group", 'n'), + MX_OPTION_REQUIRED_ARG("group-id", 'g'), + MX_OPTION_REQUIRED_ARG("group-name", 'N'), MX_OPTION_REQUIRED_ARG("group-priority", 'P'), @@ -608,6 +685,17 @@ int main(int argc, char *argv[]) mx_log_level_set(MX_LOG_INFO); break; + case 'g': + if (arg_groupid == 0) { + mx_log_crit("--group-id: invalid use after --new-group."); + exit(EX_CONFIG); + } + if (mx_strtou64(optctl.optarg, &arg_groupid) < 0) { + mx_log_crit("--group-id '%s': %m", optctl.optarg); + exit(EX_CONFIG); + } + break; + case 'n': if (arg_groupid != UINT64_UNSET && arg_groupid != 0) { mx_log_crit("--new-group: invalid use. group-id already set to %lu.", arg_groupid);