Skip to content

Commit

Permalink
Merge branch 'mxqsub'
Browse files Browse the repository at this point in the history
implements #22

* mxqsub:
  mxqsub: Add option --group-id=ID to force reusing group with group_id=ID
  mxqsub: Add jobs to the group with greatest group_id when reusing groups
  mxqsub: Add option --new-group to froce new group
  mxqsub: Fix typos in usage (--help)
  • Loading branch information
mariux committed Aug 26, 2015
2 parents 9864f49 + a6491d8 commit d8370d9
Showing 1 changed file with 198 additions and 8 deletions.
206 changes: 198 additions & 8 deletions mxqsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

#define MXQ_JOB_STATUS_ACTIVE (1)

#define UINT64_UNSET (uint64_t)(-1)

static void print_usage(void)
{
Expand Down Expand Up @@ -75,7 +76,7 @@ static void print_usage(void)
"Job handling:\n"
" Define what to do if something bad happens:\n"
"\n"
" -r | --restart[=MODE] restart job on system failure (default: 'never')\n"
" -r, --restart[=MODE] restart job on system failure (default: 'never')\n"
"\n"
" available restart [MODE]s:\n"
" 'never' do not restart\n"
Expand All @@ -88,6 +89,9 @@ static void print_usage(void)
" amount of resources and having the same priority\n"
" are grouped and executed in parallel.\n"
"\n"
" -n, --new-group create new group if group has no running or queued jobs\n"
" (default: reuse group based on automatic grouping)\n"
"\n"
" -a, --command-alias=NAME set command alias (default: <command>)\n"
" -N, --group-name=NAME set group name (default: 'default')\n"
" -P, --group-priority=PRIORITY set group priority (default: 127)\n"
Expand All @@ -101,8 +105,8 @@ static void print_usage(void)
"\n"
"Change how to connect to the mysql server:\n"
"\n"
" -M | --mysql-default-file[=MYSQLCNF] (default: %s)\n"
" -S | --mysql-default-group[=MYSQLGROUP] (default: %s)\n"
" -M, --mysql-default-file[=MYSQLCNF] (default: %s)\n"
" -S, --mysql-default-group[=MYSQLGROUP] (default: %s)\n"
"\n"
"Environment:\n"
" MXQ_MYSQL_DEFAULT_FILE change default for [MYSQLCNF]\n"
Expand Down Expand Up @@ -154,7 +158,157 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g)
" AND job_time = ?"
" AND group_priority = ?"
" AND group_status = 0"
" ORDER BY group_id "
" ORDER BY group_id DESC"
" LIMIT 1");
if (!stmt) {
mx_log_err("mx_mysql_statement_prepare(): %m");
return -errno;
}

res = mx_mysql_statement_param_bind(stmt, 0, string, &(g->group_name));
res += mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid));
res += mx_mysql_statement_param_bind(stmt, 2, string, &(g->user_name));
res += mx_mysql_statement_param_bind(stmt, 3, uint32, &(g->user_gid));
res += mx_mysql_statement_param_bind(stmt, 4, string, &(g->user_group));
res += mx_mysql_statement_param_bind(stmt, 5, string, &(g->job_command));
res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads));
res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory));
res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time));
res += mx_mysql_statement_param_bind(stmt, 9, uint16, &(g->group_priority));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
if (res < 0) {
mx_log_err("mx_mysql_statement_execute(): %m");
mx_mysql_statement_close(&stmt);
return res;
}
assert(num_rows <= 1);

if (num_rows) {
mx_mysql_statement_result_bind(stmt, 0, uint64, &(g->group_id));

res = mx_mysql_statement_fetch(stmt);
if (res < 0) {
mx_log_err("mx_mysql_statement_fetch(): %m");
mx_mysql_statement_close(&stmt);
return res;
}
}

mx_mysql_statement_close(&stmt);

return (int)num_rows;
}

static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g)
{
struct mx_mysql_stmt *stmt = NULL;
unsigned long long num_rows = 0;
int res;

assert(mysql);
assert(g);
assert(g->group_id > 0);

assert(g->group_name); assert(*g->group_name);
assert(g->group_priority);
assert(g->user_uid); assert(g->user_name); assert(*g->user_name);
assert(g->user_gid); assert(g->user_group); assert(*g->user_group);
assert(g->job_command); assert(*g->job_command);
assert(g->job_threads); assert(g->job_memory); assert(g->job_time);

stmt = mx_mysql_statement_prepare(mysql,
"SELECT"
" group_id"
" FROM mxq_group "
" WHERE group_name = ?"
" AND user_uid = ?"
" AND user_name = ?"
" AND user_gid = ?"
" AND user_group = ?"
" AND job_command = ?"
" AND job_threads = ?"
" AND job_memory = ?"
" AND job_time = ?"
" AND group_priority = ?"
" AND group_status = 0"
" AND group_id = ?"
" ORDER BY group_id DESC"
" LIMIT 1");
if (!stmt) {
mx_log_err("mx_mysql_statement_prepare(): %m");
return -errno;
}

res = mx_mysql_statement_param_bind(stmt, 0, string, &(g->group_name));
res += mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid));
res += mx_mysql_statement_param_bind(stmt, 2, string, &(g->user_name));
res += mx_mysql_statement_param_bind(stmt, 3, uint32, &(g->user_gid));
res += mx_mysql_statement_param_bind(stmt, 4, string, &(g->user_group));
res += mx_mysql_statement_param_bind(stmt, 5, string, &(g->job_command));
res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads));
res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory));
res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time));
res += mx_mysql_statement_param_bind(stmt, 9, uint16, &(g->group_priority));
res += mx_mysql_statement_param_bind(stmt, 10, uint64, &(g->group_id));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
if (res < 0) {
mx_log_err("mx_mysql_statement_execute(): %m");
mx_mysql_statement_close(&stmt);
return res;
}
assert(num_rows <= 1);

if (!num_rows) {
g->group_id = 0;
}

mx_mysql_statement_close(&stmt);

return (int)num_rows;
}

static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g)
{
struct mx_mysql_stmt *stmt = NULL;
unsigned long long num_rows = 0;
int res;

assert(mysql);
assert(g);
assert(g->group_id == 0);

assert(g->group_name); assert(*g->group_name);
assert(g->group_priority);
assert(g->user_uid); assert(g->user_name); assert(*g->user_name);
assert(g->user_gid); assert(g->user_group); assert(*g->user_group);
assert(g->job_command); assert(*g->job_command);
assert(g->job_threads); assert(g->job_memory); assert(g->job_time);

stmt = mx_mysql_statement_prepare(mysql,
"SELECT"
" group_id"
" FROM mxq_group "
" WHERE group_name = ?"
" AND user_uid = ?"
" AND user_name = ?"
" AND user_gid = ?"
" AND user_group = ?"
" AND job_command = ?"
" AND job_threads = ?"
" AND job_memory = ?"
" AND job_time = ?"
" AND group_priority = ?"
" AND group_status = 0"
" AND ("
"group_jobs_running > 0"
" OR group_jobs_inq > 0"
" OR group_jobs = 0"
")"
" ORDER BY group_id DESC"
" LIMIT 1");
if (!stmt) {
mx_log_err("mx_mysql_statement_prepare(): %m");
Expand Down Expand Up @@ -332,14 +486,26 @@ static int add_job(struct mx_mysql *mysql, struct mxq_job *j)
return (int)num_rows;
}

static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, int flags)
static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, int flags, uint64_t group_id)
{
int res;
struct mxq_group *g;

g = j->group_ptr;

res = load_group_id(mysql, g);
if (group_id == UINT64_UNSET) {
res = load_group_id(mysql, g);
} else if (group_id == 0) {
res = load_group_id_run_or_wait(mysql, g);
} else {
g->group_id = group_id;
res = load_group_id_by_group_id(mysql, g);
if (res == 0) {
mx_log_crit("Could not load group with group_id=%lu: No matching group found. Aborting.", group_id);
return -(errno=ENOENT);
}
}

if (res < 0)
return res;

Expand Down Expand Up @@ -393,6 +559,7 @@ int main(int argc, char *argv[])
u_int16_t arg_threads;
u_int64_t arg_memory;
u_int32_t arg_time;
u_int64_t arg_groupid;
char *arg_workdir;
char *arg_stdout;
char *arg_stderr;
Expand Down Expand Up @@ -434,6 +601,10 @@ int main(int argc, char *argv[])

MX_OPTION_OPTIONAL_ARG("restartable", 'r'),

MX_OPTION_NO_ARG("new-group", 'n'),

MX_OPTION_REQUIRED_ARG("group-id", 'g'),

MX_OPTION_REQUIRED_ARG("group-name", 'N'),
MX_OPTION_REQUIRED_ARG("group-priority", 'P'),

Expand All @@ -456,7 +627,6 @@ int main(int argc, char *argv[])
MX_OPTION_END
};


/******************************************************************/

current_workdir = get_current_dir_name();
Expand All @@ -476,6 +646,7 @@ int main(int argc, char *argv[])
arg_umask = getumask();
arg_debug = 0;
arg_jobflags = 0;
arg_groupid = UINT64_UNSET;

arg_mysql_default_group = getenv("MXQ_MYSQL_DEFAULT_GROUP");
if (!arg_mysql_default_group)
Expand Down Expand Up @@ -514,6 +685,25 @@ int main(int argc, char *argv[])
mx_log_level_set(MX_LOG_INFO);
break;

case 'g':
if (arg_groupid == 0) {
mx_log_crit("--group-id: invalid use after --new-group.");
exit(EX_CONFIG);
}
if (mx_strtou64(optctl.optarg, &arg_groupid) < 0) {
mx_log_crit("--group-id '%s': %m", optctl.optarg);
exit(EX_CONFIG);
}
break;

case 'n':
if (arg_groupid != UINT64_UNSET && arg_groupid != 0) {
mx_log_crit("--new-group: invalid use. group-id already set to %lu.", arg_groupid);
exit(EX_CONFIG);
}
arg_groupid = 0;
break;

case 'r':
if (!optctl.optarg || mx_streq(optctl.optarg, "always")) {
arg_jobflags |= MXQ_JOB_FLAGS_RESTART_ON_HOSTFAIL;
Expand Down Expand Up @@ -757,7 +947,7 @@ int main(int argc, char *argv[])

mx_log_info("MySQL: Connection to database established.");

res = mxq_submit_task(mysql, &job, flags);
res = mxq_submit_task(mysql, &job, flags, arg_groupid);

mx_mysql_finish(&mysql);

Expand Down

0 comments on commit d8370d9

Please sign in to comment.