Skip to content

Commit

Permalink
mxqsub: Add option --new-group to froce new group
Browse files Browse the repository at this point in the history
this is part of #22
  • Loading branch information
mariux committed Aug 26, 2015
1 parent 1cd2349 commit 79fcd63
Showing 1 changed file with 106 additions and 4 deletions.
110 changes: 106 additions & 4 deletions mxqsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

#define MXQ_JOB_STATUS_ACTIVE (1)

#define UINT64_UNSET (uint64_t)(-1)

static void print_usage(void)
{
Expand Down Expand Up @@ -88,6 +89,9 @@ static void print_usage(void)
" amount of resources and having the same priority\n"
" are grouped and executed in parallel.\n"
"\n"
" -n, --new-group create new group if group has no running or queued jobs\n"
" (default: reuse group based on automatic grouping)\n"
"\n"
" -a, --command-alias=NAME set command alias (default: <command>)\n"
" -N, --group-name=NAME set group name (default: 'default')\n"
" -P, --group-priority=PRIORITY set group priority (default: 127)\n"
Expand Down Expand Up @@ -197,6 +201,86 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g)
return (int)num_rows;
}

static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g)
{
struct mx_mysql_stmt *stmt = NULL;
unsigned long long num_rows = 0;
int res;

assert(mysql);
assert(g);
assert(g->group_id == 0);

assert(g->group_name); assert(*g->group_name);
assert(g->group_priority);
assert(g->user_uid); assert(g->user_name); assert(*g->user_name);
assert(g->user_gid); assert(g->user_group); assert(*g->user_group);
assert(g->job_command); assert(*g->job_command);
assert(g->job_threads); assert(g->job_memory); assert(g->job_time);

stmt = mx_mysql_statement_prepare(mysql,
"SELECT"
" group_id"
" FROM mxq_group "
" WHERE group_name = ?"
" AND user_uid = ?"
" AND user_name = ?"
" AND user_gid = ?"
" AND user_group = ?"
" AND job_command = ?"
" AND job_threads = ?"
" AND job_memory = ?"
" AND job_time = ?"
" AND group_priority = ?"
" AND group_status = 0"
" AND ("
"group_jobs_running > 0"
" OR group_jobs_inq > 0"
" OR group_jobs = 0"
")"
" ORDER BY group_id DESC"
" LIMIT 1");
if (!stmt) {
mx_log_err("mx_mysql_statement_prepare(): %m");
return -errno;
}

res = mx_mysql_statement_param_bind(stmt, 0, string, &(g->group_name));
res += mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid));
res += mx_mysql_statement_param_bind(stmt, 2, string, &(g->user_name));
res += mx_mysql_statement_param_bind(stmt, 3, uint32, &(g->user_gid));
res += mx_mysql_statement_param_bind(stmt, 4, string, &(g->user_group));
res += mx_mysql_statement_param_bind(stmt, 5, string, &(g->job_command));
res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads));
res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory));
res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time));
res += mx_mysql_statement_param_bind(stmt, 9, uint16, &(g->group_priority));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
if (res < 0) {
mx_log_err("mx_mysql_statement_execute(): %m");
mx_mysql_statement_close(&stmt);
return res;
}
assert(num_rows <= 1);

if (num_rows) {
mx_mysql_statement_result_bind(stmt, 0, uint64, &(g->group_id));

res = mx_mysql_statement_fetch(stmt);
if (res < 0) {
mx_log_err("mx_mysql_statement_fetch(): %m");
mx_mysql_statement_close(&stmt);
return res;
}
}

mx_mysql_statement_close(&stmt);

return (int)num_rows;
}

static int add_group(struct mx_mysql *mysql, struct mxq_group *g)
{
struct mx_mysql_stmt *stmt = NULL;
Expand Down Expand Up @@ -332,14 +416,21 @@ static int add_job(struct mx_mysql *mysql, struct mxq_job *j)
return (int)num_rows;
}

static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, int flags)
static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, int flags, uint64_t group_id)
{
int res;
struct mxq_group *g;

g = j->group_ptr;

res = load_group_id(mysql, g);
if (group_id == UINT64_UNSET) {
res = load_group_id(mysql, g);
} else if (group_id == 0) {
res = load_group_id_run_or_wait(mysql, g);
} else {
assert(0);
}

if (res < 0)
return res;

Expand Down Expand Up @@ -393,6 +484,7 @@ int main(int argc, char *argv[])
u_int16_t arg_threads;
u_int64_t arg_memory;
u_int32_t arg_time;
u_int64_t arg_groupid;
char *arg_workdir;
char *arg_stdout;
char *arg_stderr;
Expand Down Expand Up @@ -434,6 +526,8 @@ int main(int argc, char *argv[])

MX_OPTION_OPTIONAL_ARG("restartable", 'r'),

MX_OPTION_NO_ARG("new-group", 'n'),

MX_OPTION_REQUIRED_ARG("group-name", 'N'),
MX_OPTION_REQUIRED_ARG("group-priority", 'P'),

Expand All @@ -456,7 +550,6 @@ int main(int argc, char *argv[])
MX_OPTION_END
};


/******************************************************************/

current_workdir = get_current_dir_name();
Expand All @@ -476,6 +569,7 @@ int main(int argc, char *argv[])
arg_umask = getumask();
arg_debug = 0;
arg_jobflags = 0;
arg_groupid = UINT64_UNSET;

arg_mysql_default_group = getenv("MXQ_MYSQL_DEFAULT_GROUP");
if (!arg_mysql_default_group)
Expand Down Expand Up @@ -514,6 +608,14 @@ int main(int argc, char *argv[])
mx_log_level_set(MX_LOG_INFO);
break;

case 'n':
if (arg_groupid != UINT64_UNSET && arg_groupid != 0) {
mx_log_crit("--new-group: invalid use. group-id already set to %lu.", arg_groupid);
exit(EX_CONFIG);
}
arg_groupid = 0;
break;

case 'r':
if (!optctl.optarg || mx_streq(optctl.optarg, "always")) {
arg_jobflags |= MXQ_JOB_FLAGS_RESTART_ON_HOSTFAIL;
Expand Down Expand Up @@ -757,7 +859,7 @@ int main(int argc, char *argv[])

mx_log_info("MySQL: Connection to database established.");

res = mxq_submit_task(mysql, &job, flags);
res = mxq_submit_task(mysql, &job, flags, arg_groupid);

mx_mysql_finish(&mysql);

Expand Down

0 comments on commit 79fcd63

Please sign in to comment.