Skip to content

Commit

Permalink
Merge branch 'mxqsub'
Browse files Browse the repository at this point in the history
* mxqsub:
  mysql: Add column job_id_new to table mxq_job to link restarted job
  mx_mysql: Improve error handling
  mxqsub: Improve assertions
  mxqsub: Fix minor memory leaks
  mxqsub: Add --restart option
  • Loading branch information
mariux committed May 22, 2015
2 parents 1b6a8bd + 6c56ab1 commit dc7499b
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 41 deletions.
8 changes: 6 additions & 2 deletions mx_mysql.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ static int mx__mysql_stmt_prepare(struct mx_mysql_stmt *stmt, char *statement)
case CR_SERVER_GONE_ERROR:
case CR_SERVER_LOST:
return -(errno=EAGAIN);

case ER_PARSE_ERROR:
mx__mysql_stmt_log_emerg(stmt);
return -(errno=EBADRQC);
}

mx__mysql_stmt_log_emerg(stmt);
Expand Down Expand Up @@ -638,7 +642,7 @@ static inline int _mx_mysql_bind_validate(struct mx_mysql_bind *b)

for (i=0; i < b->count; i++) {
if (!(b->data[i].flags)) {
return -(errno=ENOENT);
return -(errno=EBADSLT);
}
}

Expand Down Expand Up @@ -913,7 +917,7 @@ int mx_mysql_statement_execute(struct mx_mysql_stmt *stmt, unsigned long long *c

res = _mx_mysql_bind_validate(&stmt->param);
if (res < 0) {
mx_log_debug("ERROR: param not initialized completely.");
mx_log_crit("MxSQL: parameter list for prepared statement not initialized completely.");
return res;
}

Expand Down
7 changes: 7 additions & 0 deletions mxq_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ struct mxq_job {
#define MXQ_JOB_STATUS_UNKNOWN 999
#define MXQ_JOB_STATUS_FINISHED 1000

#define MXQ_JOB_FLAGS_RESTART_ON_HOSTFAIL (1<<0)
#define MXQ_JOB_FLAGS_REQUEUE_ON_HOSTFAIL (1<<1)

#define MXQ_JOB_FLAGS_AUTORESTART (1<<62)
#define MXQ_JOB_FLAGS_HOSTFAIL (1<<63)


#define _to_string(s) #s
#define status_str(x) _to_string(x)

Expand Down
106 changes: 67 additions & 39 deletions mxqsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ static void print_usage(void)
" -m | --memory <size> set amount of memory in MiB (default: 2048)\n"
" -t | --time <minutes> set runtime in minutes (default: 15)\n"
"\n"
"Job handling:\n"
" Define what to do if something bad happens.\n"
"\n"
" -r | --restart [restartmode] restart job on system failure (default: 'never')\n"
"\n"
" available [restartmode]s:\n"
" 'samehost' only restart if running on the same host.\n"
" 'always' always restart or requeue. (default)\n"
"\n"
"Job grouping:\n"
" Grouping is done by default based on the jobs resource\n"
" and priority information, so that jobs using the same\n"
Expand Down Expand Up @@ -181,19 +190,17 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g)
return res;
}

assert(mx_mysql_statement_field_count(stmt) == 1);
assert(mx_mysql_statement_param_count(stmt) == 10);

mx_mysql_statement_param_bind(stmt, 0, string, &(g->group_name));
mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid));
mx_mysql_statement_param_bind(stmt, 2, string, &(g->user_name));
mx_mysql_statement_param_bind(stmt, 3, uint32, &(g->user_gid));
mx_mysql_statement_param_bind(stmt, 4, string, &(g->user_group));
mx_mysql_statement_param_bind(stmt, 5, string, &(g->job_command));
mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads));
mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory));
mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time));
mx_mysql_statement_param_bind(stmt, 9, uint16, &(g->group_priority));
res = mx_mysql_statement_param_bind(stmt, 0, string, &(g->group_name));
res += mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid));
res += mx_mysql_statement_param_bind(stmt, 2, string, &(g->user_name));
res += mx_mysql_statement_param_bind(stmt, 3, uint32, &(g->user_gid));
res += mx_mysql_statement_param_bind(stmt, 4, string, &(g->user_group));
res += mx_mysql_statement_param_bind(stmt, 5, string, &(g->job_command));
res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads));
res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory));
res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time));
res += mx_mysql_statement_param_bind(stmt, 9, uint16, &(g->group_priority));
assert(res == 0);

res = mx_mysql_statement_execute(stmt, &num_rows);
if (res < 0) {
Expand Down Expand Up @@ -255,19 +262,17 @@ static int add_group(struct mx_mysql *mysql, struct mxq_group *g)
return res;
}

assert(mx_mysql_statement_field_count(stmt) == 0);
assert(mx_mysql_statement_param_count(stmt) == 10);

mx_mysql_statement_param_bind(stmt, 0, string, &(g->group_name));
mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid));
mx_mysql_statement_param_bind(stmt, 2, string, &(g->user_name));
mx_mysql_statement_param_bind(stmt, 3, uint32, &(g->user_gid));
mx_mysql_statement_param_bind(stmt, 4, string, &(g->user_group));
mx_mysql_statement_param_bind(stmt, 5, string, &(g->job_command));
mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads));
mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory));
mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time));
mx_mysql_statement_param_bind(stmt, 9, uint16, &(g->group_priority));
res = mx_mysql_statement_param_bind(stmt, 0, string, &(g->group_name));
res += mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid));
res += mx_mysql_statement_param_bind(stmt, 2, string, &(g->user_name));
res += mx_mysql_statement_param_bind(stmt, 3, uint32, &(g->user_gid));
res += mx_mysql_statement_param_bind(stmt, 4, string, &(g->user_group));
res += mx_mysql_statement_param_bind(stmt, 5, string, &(g->job_command));
res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads));
res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory));
res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time));
res += mx_mysql_statement_param_bind(stmt, 9, uint16, &(g->group_priority));
assert(res ==0);

res = mx_mysql_statement_execute(stmt, &num_rows);
if (res < 0) {
Expand Down Expand Up @@ -322,28 +327,32 @@ static int add_job(struct mx_mysql *mysql, struct mxq_job *j)

" job_umask = ?,"

" host_submit = ?");
" host_submit = ?,"

" job_flags = ?"
);
if (res < 0) {
mx_log_err("mx_mysql_statement_prepare(): %m");
mx_mysql_statement_close(&stmt);
return res;
}

assert(mx_mysql_statement_field_count(stmt) == 0);
assert(mx_mysql_statement_param_count(stmt) == 9);

mx_mysql_statement_param_bind(stmt, 0, uint16, &(j->job_priority));
mx_mysql_statement_param_bind(stmt, 1, uint64, &(j->group_id));
mx_mysql_statement_param_bind(stmt, 2, string, &(j->job_workdir));
mx_mysql_statement_param_bind(stmt, 3, uint16, &(j->job_argc));
mx_mysql_statement_param_bind(stmt, 4, string, &(j->job_argv_str));
mx_mysql_statement_param_bind(stmt, 5, string, &(j->job_stdout));
mx_mysql_statement_param_bind(stmt, 6, string, &(j->job_stderr));
mx_mysql_statement_param_bind(stmt, 7, uint32, &(j->job_umask));
mx_mysql_statement_param_bind(stmt, 8, string, &(j->host_submit));
res = mx_mysql_statement_param_bind(stmt, 0, uint16, &(j->job_priority));
res += mx_mysql_statement_param_bind(stmt, 1, uint64, &(j->group_id));
res += mx_mysql_statement_param_bind(stmt, 2, string, &(j->job_workdir));
res += mx_mysql_statement_param_bind(stmt, 3, uint16, &(j->job_argc));
res += mx_mysql_statement_param_bind(stmt, 4, string, &(j->job_argv_str));
res += mx_mysql_statement_param_bind(stmt, 5, string, &(j->job_stdout));
res += mx_mysql_statement_param_bind(stmt, 6, string, &(j->job_stderr));
res += mx_mysql_statement_param_bind(stmt, 7, uint32, &(j->job_umask));
res += mx_mysql_statement_param_bind(stmt, 8, string, &(j->host_submit));
res += mx_mysql_statement_param_bind(stmt, 9, uint64, &(j->job_flags));
assert(res ==0);

res = mx_mysql_statement_execute(stmt, &num_rows);
if (res < 0) {
mx_log_err("mx_mysql_statement_execute(): %m");
mx_mysql_statement_close(&stmt);
return res;
}

Expand Down Expand Up @@ -426,6 +435,7 @@ int main(int argc, char *argv[])
char *arg_mysql_default_file;
char *arg_mysql_default_group;
char arg_debug;
char arg_jobflags;

_mx_cleanup_free_ char *current_workdir = NULL;
_mx_cleanup_free_ char *arg_stdout_absolute = NULL;
Expand Down Expand Up @@ -457,6 +467,8 @@ int main(int argc, char *argv[])
MX_OPTION_NO_ARG("debug", 5),
MX_OPTION_NO_ARG("verbose", 'v'),

MX_OPTION_OPTIONAL_ARG("restartable", 'r'),

MX_OPTION_REQUIRED_ARG("group-name", 'N'),
MX_OPTION_REQUIRED_ARG("group-priority", 'P'),

Expand Down Expand Up @@ -498,6 +510,7 @@ int main(int argc, char *argv[])
arg_stderr = "stdout";
arg_umask = getumask();
arg_debug = 0;
arg_jobflags = 0;

arg_mysql_default_group = getenv("MXQ_MYSQL_DEFAULT_GROUP");
if (!arg_mysql_default_group)
Expand Down Expand Up @@ -536,6 +549,20 @@ int main(int argc, char *argv[])
mx_log_level_set(MX_LOG_INFO);
break;

case 'r':
if (!optctl.optarg || streq(optctl.optarg, "always")) {
arg_jobflags |= MXQ_JOB_FLAGS_RESTART_ON_HOSTFAIL;
arg_jobflags |= MXQ_JOB_FLAGS_REQUEUE_ON_HOSTFAIL;
} else if (streq(optctl.optarg, "samehost")) {
arg_jobflags |= MXQ_JOB_FLAGS_RESTART_ON_HOSTFAIL;
} else if (streq(optctl.optarg, "never")) {
arg_jobflags &= ~(MXQ_JOB_FLAGS_RESTART_ON_HOSTFAIL|MXQ_JOB_FLAGS_REQUEUE_ON_HOSTFAIL);
} else {
mx_log_crit("--restart '%s': restartmode unknown.", optctl.optarg);
exit(EX_CONFIG);
}
break;

case 'p':
if (mx_strtou16(optctl.optarg, &arg_priority) < 0) {
mx_log_crit("--priority '%s': %m", optctl.optarg);
Expand Down Expand Up @@ -700,6 +727,7 @@ int main(int argc, char *argv[])
group.job_memory = arg_memory;
group.job_time = arg_time;

job.job_flags = arg_jobflags;
job.job_priority = arg_priority;
job.job_workdir = arg_workdir;
job.job_stdout = arg_stdout;
Expand Down
6 changes: 6 additions & 0 deletions mysql/create_tables
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@

ALTER TABLE mxq_job
ADD COLUMN job_id_new INT8 UNSIGNED NULL DEFAULT NULL
AFTER date_end;

ALTER TABLE mxq_job
ADD COLUMN job_flags INT8 UNSIGNED NOT NULL DEFAULT 0
AFTER job_status;
Expand Down Expand Up @@ -82,6 +86,8 @@ CREATE TABLE IF NOT EXISTS mxq_job (
date_start TIMESTAMP NOT NULL DEFAULT 0,
date_end TIMESTAMP NOT NULL DEFAULT 0,

job_id_new INT8 UNSIGNED NULL DEFAULT NULL,

stats_status INT4 UNSIGNED NOT NULL DEFAULT 0,

stats_utime_sec INT4 UNSIGNED NOT NULL DEFAULT 0,
Expand Down

0 comments on commit dc7499b

Please sign in to comment.