#define _GNU_SOURCE #include <stdio.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <limits.h> #include <pwd.h> #include <grp.h> #include <assert.h> #include <sysexits.h> #include <stdarg.h> #include <mysql.h> #include <inttypes.h> #include <sys/time.h> #include <sys/resource.h> #include <string.h> #include "mxq_group.h" #include "mxq_job.h" #include "mx_log.h" #include "mx_util.h" #include "mx_getopt.h" #include "mx_mysql.h" #include "keywordset.h" #include "parser.tab.h" #include "mxq.h" #define UINT64_UNSET (uint64_t)(-1) static void print_usage(void) { mxq_print_generic_version(); printf( "\n" "Usage:\n" " %s [options] <command [arguments]>\n" "\n" "Synopsis:\n" " queue a job to be executed on a cluster node.\n" " <command [arguments]> will be executed on a node that offers\n" " enough resources to run the job. the following [options] can\n" " influence the job environment and the scheduling decisions made\n" " by the cluster:\n" "\n" "Job environment:\n" " -w, --workdir=DIRECTORY set working directory (has to exist)\n" " (default: current workdir)\n" " -o, --stdout=FILE set file to capture stdout (default: '/dev/null')\n" " -e, --stderr=FILE set file to capture stderr (default: <stdout>)\n" " -u, --umask=MASK set mode to use as umask (default: current umask)\n" " -p, --priority=PRIORITY set priority (default: 127)\n" "\n" "Job resource information:\n" " Scheduling is done based on the resources a job needs and\n" " on the priority given to the job.\n" "\n" " -j, --processors=NUMBER set number of processors (default: 1)\n" " -m, --memory=SIZE set amount of memory (default: 2G)\n" " --tmpdir=SIZE set size of MXQ_JOB_TMPDIR (default: 0)\n" " --gpu request a gpu\n" " --blacklist=STRING set list of blacklisted servers (default: '')\n" " --whitelist=STRING set list of whitelisted servers (default: '')\n" " --prerequisites=STRING set prerequisites (default: '')\n" " --tags=STRING set tags (default: '')\n" "\n" " [SIZE] may be suffixed with a combination of T, G and M\n" " to specify tebibytes, gibibytes and mebibytes.\n" " Defaults to mebibytes if no suffix is set.\n" "\n" " -t, --runtime=TIME set runtime (default: 15m)\n" "\n" " [TIME] may be suffixed with a combination of y, w, d, h and m\n" " to specify years, weeks, days, hours and minutes\n" " Defaults to minutes if no suffix is set.\n" "\n" " --max-jobs-per-node=NUMBER limit the number of jobs executed on each cluster node\n" " (default: 0 [limited by the server])\n" "\n" "Job grouping:\n" " Grouping is done by default based on the jobs resource\n" " and priority information, so that jobs using the same\n" " amount of resources and having the same priority\n" " are grouped and executed in parallel.\n" " (see 'mxqadmin --help' for details on how to close groups)\n" "\n" " -g, --group-id=ID Add job to group with group_id ID\n" " -n, --new-group Add job to a new group if it does not match any running or waiting group\n" " (default: Add job to a group based on automatic grouping)\n" "\n" " -a, --command-alias=NAME set command alias (default: <command>)\n" " -N, --group-name=NAME set group name (default: 'default')\n" " -P, --group-priority=PRIORITY set group priority (default: 127)\n" "\n" "Other options:\n" "\n" " -v, --verbose be more verbose\n" " --debug set debug log level (default: warning log level)\n" " -V, --version print version and exit\n" " -h, --help print this help and exit ;)\n" "\n" "Change how to connect to the mysql server:\n" "\n" " -M, --mysql-default-file[=MYSQLCNF] (default: %s)\n" " -S, --mysql-default-group[=MYSQLGROUP] (default: %s)\n" "\n" "Environment:\n" " MXQ_MYSQL_DEFAULT_FILE change default for [MYSQLCNF]\n" " MXQ_MYSQL_DEFAULT_GROUP change default for [MYSQLGROUP]\n" "\n", program_invocation_short_name, MXQ_MYSQL_DEFAULT_FILE_STR, MXQ_MYSQL_DEFAULT_GROUP_STR ); } mode_t getumask(void) { mode_t mask = umask( 0 ); umask(mask); return mask; } static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g) { struct mx_mysql_stmt *stmt = NULL; unsigned long long num_rows = 0; int res; uint64_t flags; assert(mysql); assert(g); assert(g->group_id == 0); assert(g->group_name); assert(*g->group_name); assert(g->group_priority); assert(g->user_uid); assert(g->user_name); assert(*g->user_name); assert(g->user_gid); assert(g->user_group); assert(*g->user_group); assert(g->job_command); assert(*g->job_command); assert(g->job_threads); assert(g->job_memory); assert(g->job_time); flags = MXQ_GROUP_FLAG_CLOSED; stmt = mx_mysql_statement_prepare(mysql, "SELECT" " group_id," " group_jobs_inq" " FROM mxq_group " " WHERE group_name = ?" " AND user_uid = ?" " AND user_name = ?" " AND user_gid = ?" " AND user_group = ?" " AND job_command = ?" " AND job_threads = ?" " AND job_memory = ?" " AND job_time = ?" " AND job_tmpdir_size = ?" " AND job_gpu = ?" " AND job_max_per_node = ?" " AND group_priority = ?" " AND group_blacklist = ?" " AND group_whitelist = ?" " AND prerequisites = ?" " AND tags = ?" " AND group_status = 0" " AND group_flags & ? = 0 " " ORDER BY group_id DESC" " LIMIT 1"); if (!stmt) { mx_log_err("mx_mysql_statement_prepare(): %m"); return -errno; } res = mx_mysql_statement_param_bind(stmt, 0, string, &(g->group_name)); res += mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid)); res += mx_mysql_statement_param_bind(stmt, 2, string, &(g->user_name)); res += mx_mysql_statement_param_bind(stmt, 3, uint32, &(g->user_gid)); res += mx_mysql_statement_param_bind(stmt, 4, string, &(g->user_group)); res += mx_mysql_statement_param_bind(stmt, 5, string, &(g->job_command)); res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads)); res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory)); res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time)); res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size)); res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->job_gpu)); res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->job_max_per_node)); res += mx_mysql_statement_param_bind(stmt, 12, uint16, &(g->group_priority)); res += mx_mysql_statement_param_bind(stmt, 13, string, &(g->group_blacklist)); res += mx_mysql_statement_param_bind(stmt, 14, string, &(g->group_whitelist)); res += mx_mysql_statement_param_bind(stmt, 15, string, &(g->prerequisites)); res += mx_mysql_statement_param_bind(stmt, 16, string, &(g->tags)); res += mx_mysql_statement_param_bind(stmt, 17, uint64, &(flags)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); if (res < 0) { mx_log_err("mx_mysql_statement_execute(): %m"); mx_mysql_statement_close(&stmt); return res; } assert(num_rows <= 1); if (num_rows) { mx_mysql_statement_result_bind(stmt, 0, uint64, &(g->group_id)); mx_mysql_statement_result_bind(stmt, 1, uint64, &(g->group_jobs_inq)); res = mx_mysql_statement_fetch(stmt); if (res < 0) { mx_log_err("mx_mysql_statement_fetch(): %m"); mx_mysql_statement_close(&stmt); return res; } } mx_mysql_statement_close(&stmt); return (int)num_rows; } static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g) { struct mx_mysql_stmt *stmt = NULL; unsigned long long num_rows = 0; int res; uint64_t flags; assert(mysql); assert(g); assert(g->group_id > 0); assert(g->group_name); assert(*g->group_name); assert(g->group_priority); assert(g->user_uid); assert(g->user_name); assert(*g->user_name); assert(g->user_gid); assert(g->user_group); assert(*g->user_group); assert(g->job_command); assert(*g->job_command); assert(g->job_threads); assert(g->job_memory); assert(g->job_time); flags = MXQ_GROUP_FLAG_CLOSED; stmt = mx_mysql_statement_prepare(mysql, "SELECT" " group_id" " FROM mxq_group " " WHERE group_name = ?" " AND user_uid = ?" " AND user_name = ?" " AND user_gid = ?" " AND user_group = ?" " AND job_command = ?" " AND job_threads = ?" " AND job_memory = ?" " AND job_time = ?" " AND job_tmpdir_size = ?" " AND job_gpu = ?" " AND job_max_per_node = ?" " AND group_priority = ?" " AND group_blacklist = ?" " AND group_whitelist = ?" " AND prerequisites = ?" " AND tags = ?" " AND group_status = 0" " AND group_id = ?" " AND group_flags & ? = 0 " " ORDER BY group_id DESC" " LIMIT 1"); if (!stmt) { mx_log_err("mx_mysql_statement_prepare(): %m"); return -errno; } res = mx_mysql_statement_param_bind(stmt, 0, string, &(g->group_name)); res += mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid)); res += mx_mysql_statement_param_bind(stmt, 2, string, &(g->user_name)); res += mx_mysql_statement_param_bind(stmt, 3, uint32, &(g->user_gid)); res += mx_mysql_statement_param_bind(stmt, 4, string, &(g->user_group)); res += mx_mysql_statement_param_bind(stmt, 5, string, &(g->job_command)); res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads)); res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory)); res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time)); res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size)); res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->job_gpu)); res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->job_max_per_node)); res += mx_mysql_statement_param_bind(stmt, 12, uint16, &(g->group_priority)); res += mx_mysql_statement_param_bind(stmt, 13, string, &(g->group_blacklist)); res += mx_mysql_statement_param_bind(stmt, 14, string, &(g->group_whitelist)); res += mx_mysql_statement_param_bind(stmt, 15, string, &(g->prerequisites)); res += mx_mysql_statement_param_bind(stmt, 16, string, &(g->tags)); res += mx_mysql_statement_param_bind(stmt, 17, uint64, &(g->group_id)); res += mx_mysql_statement_param_bind(stmt, 18, uint64, &(flags)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); if (res < 0) { mx_log_err("mx_mysql_statement_execute(): %m"); mx_mysql_statement_close(&stmt); return res; } assert(num_rows <= 1); if (!num_rows) { g->group_id = 0; } mx_mysql_statement_close(&stmt); return (int)num_rows; } static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g) { struct mx_mysql_stmt *stmt = NULL; unsigned long long num_rows = 0; int res; uint64_t flags; assert(mysql); assert(g); assert(g->group_id == 0); assert(g->group_name); assert(*g->group_name); assert(g->group_priority); assert(g->user_uid); assert(g->user_name); assert(*g->user_name); assert(g->user_gid); assert(g->user_group); assert(*g->user_group); assert(g->job_command); assert(*g->job_command); assert(g->job_threads); assert(g->job_memory); assert(g->job_time); flags = MXQ_GROUP_FLAG_CLOSED; stmt = mx_mysql_statement_prepare(mysql, "SELECT" " group_id" " FROM mxq_group " " WHERE group_name = ?" " AND user_uid = ?" " AND user_name = ?" " AND user_gid = ?" " AND user_group = ?" " AND job_command = ?" " AND job_threads = ?" " AND job_memory = ?" " AND job_time = ?" " AND job_tmpdir_size = ?" " AND job_gpu = ?" " AND job_max_per_node = ?" " AND group_priority = ?" " AND group_blacklist = ?" " AND group_whitelist = ?" " AND prerequisites = ?" " AND tags = ?" " AND group_status = 0" " AND (" "group_jobs_running > 0" " OR group_jobs_inq > 0" " OR group_jobs = 0" ")" " AND group_flags & ? = 0 " " ORDER BY group_id DESC" " LIMIT 1"); if (!stmt) { mx_log_err("mx_mysql_statement_prepare(): %m"); return -errno; } res = mx_mysql_statement_param_bind(stmt, 0, string, &(g->group_name)); res += mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid)); res += mx_mysql_statement_param_bind(stmt, 2, string, &(g->user_name)); res += mx_mysql_statement_param_bind(stmt, 3, uint32, &(g->user_gid)); res += mx_mysql_statement_param_bind(stmt, 4, string, &(g->user_group)); res += mx_mysql_statement_param_bind(stmt, 5, string, &(g->job_command)); res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads)); res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory)); res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time)); res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size)); res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->job_gpu)); res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->job_max_per_node)); res += mx_mysql_statement_param_bind(stmt, 12, uint16, &(g->group_priority)); res += mx_mysql_statement_param_bind(stmt, 13, string, &(g->group_blacklist)); res += mx_mysql_statement_param_bind(stmt, 14, string, &(g->group_whitelist)); res += mx_mysql_statement_param_bind(stmt, 15, string, &(g->prerequisites)); res += mx_mysql_statement_param_bind(stmt, 16, string, &(g->tags)); res += mx_mysql_statement_param_bind(stmt, 17, uint64, &(flags)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); if (res < 0) { mx_log_err("mx_mysql_statement_execute(): %m"); mx_mysql_statement_close(&stmt); return res; } assert(num_rows <= 1); if (num_rows) { mx_mysql_statement_result_bind(stmt, 0, uint64, &(g->group_id)); res = mx_mysql_statement_fetch(stmt); if (res < 0) { mx_log_err("mx_mysql_statement_fetch(): %m"); mx_mysql_statement_close(&stmt); return res; } } mx_mysql_statement_close(&stmt); return (int)num_rows; } static int add_group(struct mx_mysql *mysql, struct mxq_group *g) { struct mx_mysql_stmt *stmt = NULL; unsigned long long num_rows = 0; unsigned long long insert_id = 0; int res; assert(g->group_name); assert(*g->group_name); assert(g->group_priority); assert(g->user_uid); assert(g->user_name); assert(*g->user_name); assert(g->user_gid); assert(g->user_group); assert(*g->user_group); assert(g->job_command); assert(*g->job_command); assert(g->job_threads); assert(g->job_memory); assert(g->job_time); stmt = mx_mysql_statement_prepare(mysql, "INSERT INTO mxq_group SET" " group_name = ?," " user_uid = ?," " user_name = ?," " user_gid = ?," " user_group = ?," " job_command = ?," " job_threads = ?," " job_memory = ?," " job_time = ?," " job_tmpdir_size = ?," " job_gpu = ?," " job_max_per_node = ?," " group_priority = ?," " group_blacklist = ?," " group_whitelist = ?," " prerequisites = ?," " tags = ?"); if (!stmt) { mx_log_err("mx_mysql_statement_prepare(): %m"); return -errno; } res = mx_mysql_statement_param_bind(stmt, 0, string, &(g->group_name)); res += mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid)); res += mx_mysql_statement_param_bind(stmt, 2, string, &(g->user_name)); res += mx_mysql_statement_param_bind(stmt, 3, uint32, &(g->user_gid)); res += mx_mysql_statement_param_bind(stmt, 4, string, &(g->user_group)); res += mx_mysql_statement_param_bind(stmt, 5, string, &(g->job_command)); res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads)); res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory)); res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time)); res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size)); res += mx_mysql_statement_param_bind(stmt,10, uint16, &(g->job_gpu)); res += mx_mysql_statement_param_bind(stmt,11, uint16, &(g->job_max_per_node)); res += mx_mysql_statement_param_bind(stmt,12, uint16, &(g->group_priority)); res += mx_mysql_statement_param_bind(stmt,13, string, &(g->group_blacklist)); res += mx_mysql_statement_param_bind(stmt,14, string, &(g->group_whitelist)); res += mx_mysql_statement_param_bind(stmt,15, string, &(g->prerequisites)); res += mx_mysql_statement_param_bind(stmt,16, string, &(g->tags)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); if (res < 0) { mx_log_err("mx_mysql_statement_execute(): %m"); mx_mysql_statement_close(&stmt); return res; } assert(num_rows == 1); mx_mysql_statement_insert_id(stmt, &insert_id); g->group_id = insert_id; mx_mysql_statement_close(&stmt); return (int)num_rows; } static int add_job(struct mx_mysql *mysql, struct mxq_job *j) { struct mx_mysql_stmt *stmt = NULL; unsigned long long num_rows = 0; unsigned long long insert_id = 0; int res; assert(j); assert(j->job_priority); assert(j->group_id); assert(j->job_workdir); assert(*j->job_workdir); assert(j->job_argc); assert(j->job_argv); assert(*j->job_argv); assert(j->job_argv_str); assert(*j->job_argv_str); assert(j->job_stdout); assert(*j->job_stdout); assert(j->job_stderr); assert(*j->job_stderr); assert(j->job_umask); assert(j->host_submit); assert(*j->host_submit); stmt = mx_mysql_statement_prepare(mysql, "INSERT INTO mxq_job SET" " job_priority = ?," " group_id = ?," " job_workdir = ?," " job_argc = ? ," " job_argv = ?," " job_stdout = ?," " job_stderr = ?," " job_umask = ?," " host_submit = ?," " job_flags = ?" ); if (!stmt) { mx_log_err("mx_mysql_statement_prepare(): %m"); mx_mysql_statement_close(&stmt); return -errno; } res = mx_mysql_statement_param_bind(stmt, 0, uint16, &(j->job_priority)); res += mx_mysql_statement_param_bind(stmt, 1, uint64, &(j->group_id)); res += mx_mysql_statement_param_bind(stmt, 2, string, &(j->job_workdir)); res += mx_mysql_statement_param_bind(stmt, 3, uint32, &(j->job_argc)); res += mx_mysql_statement_param_bind(stmt, 4, string, &(j->job_argv_str)); res += mx_mysql_statement_param_bind(stmt, 5, string, &(j->job_stdout)); res += mx_mysql_statement_param_bind(stmt, 6, string, &(j->job_stderr)); res += mx_mysql_statement_param_bind(stmt, 7, uint32, &(j->job_umask)); res += mx_mysql_statement_param_bind(stmt, 8, string, &(j->host_submit)); res += mx_mysql_statement_param_bind(stmt, 9, uint64, &(j->job_flags)); assert(res ==0); res = mx_mysql_statement_execute(stmt, &num_rows); if (res < 0) { mx_log_err("mx_mysql_statement_execute(): %m"); mx_mysql_statement_close(&stmt); return res; } assert(num_rows == 1); mx_mysql_statement_insert_id(stmt, &insert_id); assert(insert_id > 0); j->job_id = insert_id; res = mx_mysql_statement_close(&stmt); return (int)num_rows; } static int get_active_groups_for_user(struct mx_mysql *mysql, char *username) { struct mx_mysql_stmt *stmt; int res; unsigned long long num_rows; uint64_t count; stmt = mx_mysql_statement_prepare(mysql, "SELECT" " count(*)" " FROM mxq_group" " WHERE user_name = ?" " AND (group_jobs_inq > 0 OR group_jobs_running > 0)"); if (!stmt) { mx_log_err("mx_mysql_statement_prepare(): %m"); return -errno; } mx_mysql_statement_param_bind(stmt, 0, string, &username); res = mx_mysql_statement_execute(stmt, &num_rows); if (res < 0) { mx_log_err("mx_mysql_statement_execute(): %m"); mx_mysql_statement_close(&stmt); return res; } assert(num_rows == 1); mx_mysql_statement_result_bind(stmt, 0, uint64, &count); res = mx_mysql_statement_fetch(stmt); if (res < 0) { mx_log_err("mx_mysql_statement_fetch(): %m"); mx_mysql_statement_close(&stmt); return res; } mx_mysql_statement_close(&stmt); assert(count<INT_MAX); return count; } static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, int flags, uint64_t group_id) { int res; struct mxq_group *g; g = j->group_ptr; if (group_id == UINT64_UNSET) { res = load_group_id(mysql, g); } else if (group_id == 0) { res = load_group_id_run_or_wait(mysql, g); } else { g->group_id = group_id; res = load_group_id_by_group_id(mysql, g); if (res == 0) { mx_log_err("Could not load group with group_id=%lu: No matching open group found. Aborting.", group_id); return -(errno=ENOENT); } } if (res < 0) return res; if (res == 0) { if (get_active_groups_for_user(mysql,g->user_name)>=MXQ_MAX_ACTIVE_GROUPS_PER_USER) { mx_log_err("Limit of %d active groups reached for user %s",MXQ_MAX_ACTIVE_GROUPS_PER_USER,g->user_name); return -(errno=EDQUOT); } res = add_group(mysql, g); if (res < 0) return res; if (res == 0) { mx_log_err("Failed to add new group."); return -(errno=EIO); } mx_log_info("The new job will be added to new group with group_id=%lu", g->group_id); } else { mx_log_debug("pending: %lu",g->group_jobs_inq); if (g->group_jobs_inq >= MXQ_MAX_PENDING_JOBS_PER_GROUP) { mx_log_err("Limit of %d pending jobs per group reached for group_id %lu",MXQ_MAX_PENDING_JOBS_PER_GROUP,g->group_id); return -(errno=EDQUOT); } mx_log_info("The new job will be attached to existing group with group_id=%lu", g->group_id); } assert(g->group_id); j->group_id = g->group_id; res = add_job(mysql, j); if (res < 0) return res; if (res == 0) { mx_log_err("Failed to add job group."); return -(errno=EIO); } mx_log_info("The new job has been queued successfully with job_id=%lu in group with group_id=%lu", j->job_id, g->group_id); assert(j->job_id); return res; } static void verify_expression(char *expr) { struct keywordset *tags=keywordset_new(NULL); struct parser_context parser_context = { .input = expr, .tags = tags, .pos=0, .result = 0, }; if (yyparse(&parser_context)) { fprintf(stderr,"syntax error in prerequisites expression \"%s\"\n", expr); exit(1); } keywordset_free(tags); } int main(int argc, char *argv[]) { int i; uid_t ruid, euid, suid; gid_t rgid, egid, sgid; int res; u_int16_t arg_priority; char *arg_group_name; u_int16_t arg_group_priority; char *arg_blacklist; char *arg_whitelist; char *arg_prerequisites; char *arg_tags; char *arg_program_name; u_int16_t arg_threads; u_int64_t arg_memory; u_int32_t arg_time; u_int16_t arg_max_per_node; u_int64_t arg_groupid; char *arg_workdir; char *arg_stdout; char *arg_stderr; mode_t arg_umask; char *arg_mysql_default_file; char *arg_mysql_default_group; char arg_debug; char arg_jobflags; u_int32_t arg_tmpdir; u_int16_t arg_gpu; _mx_cleanup_free_ char *current_workdir = NULL; _mx_cleanup_free_ char *arg_stdout_absolute = NULL; _mx_cleanup_free_ char *arg_stderr_absolute = NULL; _mx_cleanup_free_ char *arg_args = NULL; _mx_cleanup_free_ char *blacklist = NULL; _mx_cleanup_free_ char *whitelist = NULL; _mx_cleanup_free_ char *tags = NULL; int flags = 0; struct mxq_job job; struct mxq_group group; struct mx_mysql *mysql = NULL; struct passwd *passwd; struct group *grp; char *p; int opt; struct mx_getopt_ctl optctl; struct mx_option opts[] = { MX_OPTION_NO_ARG("help", 'h'), MX_OPTION_NO_ARG("version", 'V'), MX_OPTION_REQUIRED_ARG("group_priority", 2), MX_OPTION_REQUIRED_ARG("time", 4), MX_OPTION_NO_ARG("debug", 5), MX_OPTION_NO_ARG("verbose", 'v'), MX_OPTION_OPTIONAL_ARG("restartable", 'r'), MX_OPTION_NO_ARG("new-group", 'n'), MX_OPTION_REQUIRED_ARG("group-id", 'g'), MX_OPTION_REQUIRED_ARG("group-name", 'N'), MX_OPTION_REQUIRED_ARG("group-priority", 'P'), MX_OPTION_REQUIRED_ARG("command-alias", 'a'), MX_OPTION_REQUIRED_ARG("workdir", 'w'), MX_OPTION_REQUIRED_ARG("stdout", 'o'), MX_OPTION_REQUIRED_ARG("stderr", 'e'), MX_OPTION_REQUIRED_ARG("umask", 'u'), MX_OPTION_REQUIRED_ARG("priority", 'p'), MX_OPTION_REQUIRED_ARG("threads", 1), MX_OPTION_REQUIRED_ARG("processors", 'j'), MX_OPTION_REQUIRED_ARG("memory", 'm'), MX_OPTION_REQUIRED_ARG("runtime", 't'), MX_OPTION_REQUIRED_ARG("max-jobs-per-node", 6), MX_OPTION_REQUIRED_ARG("define", 'D'), MX_OPTION_OPTIONAL_ARG("mysql-default-file", 'M'), MX_OPTION_OPTIONAL_ARG("mysql-default-group", 'S'), MX_OPTION_REQUIRED_ARG("tmpdir", 7), MX_OPTION_REQUIRED_ARG("blacklist", 8), MX_OPTION_REQUIRED_ARG("whitelist", 9), MX_OPTION_REQUIRED_ARG("prerequisites", 10), MX_OPTION_REQUIRED_ARG("tags", 11), MX_OPTION_NO_ARG("gpu", 12), MX_OPTION_END }; /******************************************************************/ current_workdir = get_current_dir_name(); /******************************************************************/ arg_priority = 127; arg_group_name = "default"; arg_group_priority = 127; arg_program_name = NULL; arg_threads = 1; arg_memory = 2048; arg_time = 0; arg_max_per_node = 0; arg_workdir = current_workdir; arg_stdout = "/dev/null"; arg_stderr = "stdout"; arg_umask = getumask(); arg_debug = 0; arg_jobflags = 0; arg_groupid = UINT64_UNSET; arg_tmpdir = 0; arg_blacklist = NULL; arg_whitelist = NULL; arg_prerequisites = ""; arg_tags = NULL; arg_gpu = 0; arg_mysql_default_group = getenv("MXQ_MYSQL_DEFAULT_GROUP"); if (!arg_mysql_default_group) arg_mysql_default_group = MXQ_MYSQL_DEFAULT_GROUP; arg_mysql_default_file = getenv("MXQ_MYSQL_DEFAULT_FILE"); if (!arg_mysql_default_file) arg_mysql_default_file = MXQ_MYSQL_DEFAULT_FILE; /******************************************************************/ mx_getopt_init(&optctl, argc-1, &argv[1], opts); optctl.flags = MX_FLAG_STOPONNOOPT; while ((opt=mx_getopt(&optctl, &i)) != MX_GETOPT_END) { if (opt == MX_GETOPT_ERROR) { exit(EX_USAGE); } switch (opt) { case 'V': mxq_print_generic_version(); exit(EX_USAGE); case 'h': print_usage(); exit(EX_USAGE); case 5: arg_debug = 1; mx_log_level_set(MX_LOG_DEBUG); break; case 'v': if (!arg_debug) mx_log_level_set(MX_LOG_INFO); break; case 'g': if (arg_groupid == 0) { mx_log_crit("--group-id: invalid use after --new-group."); exit(EX_CONFIG); } if (mx_strtou64(optctl.optarg, &arg_groupid) < 0) { mx_log_crit("--group-id '%s': %m", optctl.optarg); exit(EX_CONFIG); } if (arg_groupid == 0) { mx_log_crit("--group-id '%s': Invalid group-id.", optctl.optarg); exit(EX_CONFIG); } break; case 'n': if (arg_groupid != UINT64_UNSET && arg_groupid != 0) { mx_log_crit("--new-group: invalid use. group-id already set to %lu.", arg_groupid); exit(EX_CONFIG); } arg_groupid = 0; break; case 'p': if (mx_strtou16(optctl.optarg, &arg_priority) < 0) { mx_log_crit("--priority '%s': %m", optctl.optarg); exit(EX_CONFIG); } break; case 'N': if (!(*optctl.optarg)) { mx_log_crit("--group-name '%s': String is empty.", optctl.optarg); exit(EX_CONFIG); } arg_group_name = optctl.optarg; break; case 'a': p = strchr(optctl.optarg, ' '); if (p) { mx_log_crit("--command-alias '%s': String contains whitespace characters.", optctl.optarg); exit(EX_CONFIG); } if (!(*optctl.optarg)) { mx_log_crit("--command-alias '%s': String is empty.", optctl.optarg); exit(EX_CONFIG); } arg_program_name = optctl.optarg; break; case 2: mx_log_warning("option --group_priority is deprecated. please use --group-priority instead."); case 'P': if (mx_strtou16(optctl.optarg, &arg_group_priority) < 0) { mx_log_crit("--group-priority '%s': %m", optctl.optarg); exit(EX_CONFIG); } break; case 1: case 'j': if (mx_strtou16(optctl.optarg, &arg_threads) < 0) { mx_log_crit("--processors '%s': %m", optctl.optarg); exit(EX_CONFIG); } break; case 'm': if (mx_strtou64(optctl.optarg, &arg_memory) < 0) { unsigned long long int bytes; if(mx_strtobytes(optctl.optarg, &bytes) < 0) { mx_log_crit("--memory '%s': %m", optctl.optarg); exit(EX_CONFIG); } arg_memory = bytes/1024/1024; } break; case 4: mx_log_warning("option '--time' is deprecated. please use '--runtime' or '-t' in future calls."); case 't': if (mx_strtou32(optctl.optarg, &arg_time) < 0) { unsigned long long int minutes; if(mx_strtominutes(optctl.optarg, &minutes) < 0) { mx_log_crit("--runtime '%s': %m", optctl.optarg); exit(EX_CONFIG); } if ((unsigned long long int)(uint32_t)minutes != minutes) { errno = ERANGE; mx_log_crit("--runtime '%s': %m", optctl.optarg); exit(EX_CONFIG); } arg_time = (uint32_t)minutes; } break; case 6: if (mx_strtou16(optctl.optarg, &arg_max_per_node) < 0) { mx_log_crit("--max-jobs-per-node '%s': %m", optctl.optarg); exit(EX_CONFIG); } break; case 'w': if (!(*optctl.optarg)) { mx_log_crit("--workdir '%s': String is empty.", optctl.optarg); exit(EX_CONFIG); } if (optctl.optarg[0] != '/') { mx_log_crit("--workdir '%s': workdir is a relative path. please use absolute path.", optctl.optarg); exit(EX_CONFIG); } arg_workdir = optctl.optarg; break; case 'o': if (!(*optctl.optarg)) { mx_log_crit("--stdout '%s': String is empty.", optctl.optarg); exit(EX_CONFIG); } arg_stdout = optctl.optarg; break; case 'e': if (!(*optctl.optarg)) { mx_log_crit("--stderr '%s': String is empty.", optctl.optarg); exit(EX_CONFIG); } arg_stderr = optctl.optarg; break; case 'u': if (mx_strtou32(optctl.optarg, &arg_umask) < 0) { mx_log_crit("--umask '%s': %m", optctl.optarg); exit(EX_CONFIG); } break; case 'M': arg_mysql_default_file = optctl.optarg; break; case 'S': arg_mysql_default_group = optctl.optarg; break; case 7: { unsigned long long int bytes; if(mx_strtobytes(optctl.optarg, &bytes) < 0) { mx_log_crit("--tmpdir '%s': %m", optctl.optarg); exit(EX_CONFIG); } arg_tmpdir=(bytes+1024*1024*1024-1)/1024/1024/1024; } break; case 8: arg_blacklist = optctl.optarg; break; case 9: arg_whitelist = optctl.optarg; break; case 10: arg_prerequisites = optctl.optarg; break; case 11: arg_tags = optctl.optarg; break; case 12: arg_gpu = 1; break; } } MX_GETOPT_FINISH(optctl, argc, argv); if (argc < 1) { print_usage(); exit(EX_USAGE); } if (*arg_prerequisites != 0) verify_expression(arg_prerequisites); /* from this point values in argc,argv are the ones of the cluster job */ if (!arg_program_name) { p = strchr(argv[0], ' '); if (p) { mx_log_crit("<command> contains whitespace characters. Please set --command-alias if this is intended."); exit(EX_CONFIG); } arg_program_name = argv[0]; } if (!arg_time) { arg_time = 15; mx_log_warning("option '--runtime' or '-t' not used. Your job will get killed if it runs longer than the default of %d minutes.", arg_time); } if (arg_time > 60*24) { mx_log_warning("option '--runtime' specifies a runtime longer than 24h. Your job may get killed. Be sure to implement some check pointing."); } if (!(*arg_program_name)) { mx_log_crit("<command> is empty. Please check usage with '%s --help'.", program_invocation_short_name); exit(EX_CONFIG); } /******************************************************************/ if (*arg_stdout != '/') { res = asprintf(&arg_stdout_absolute, "%s/%s", arg_workdir, arg_stdout); assert(res != -1); arg_stdout = arg_stdout_absolute; } if (mx_streq(arg_stderr, "stdout")) { arg_stderr = arg_stdout; } if (*arg_stderr != '/') { res = asprintf(&arg_stderr_absolute, "%s/%s", arg_workdir, arg_stderr); assert(res != -1); arg_stderr = arg_stderr_absolute; } arg_args = mx_strvec_to_str(argv); assert(arg_args); if ( arg_blacklist != NULL ) { struct keywordset *kws = keywordset_new(arg_blacklist); blacklist = keywordset_get(kws); keywordset_free(kws); } else { blacklist = mx_strdup_forever(""); } if ( arg_whitelist != NULL ) { struct keywordset *kws = keywordset_new(arg_whitelist); whitelist = keywordset_get(kws); keywordset_free(kws); } else { whitelist = mx_strdup_forever(""); } { struct keywordset *kws = keywordset_new(NULL); if (arg_tags) keywordset_add(kws, arg_tags); if (arg_gpu) keywordset_add(kws, "gpu"); tags = keywordset_get(kws); keywordset_free(kws); } /******************************************************************/ memset(&job, 0, sizeof(job)); memset(&group, 0, sizeof(group)); /* connect job and group */ job.group_ptr = &group; /******************************************************************/ group.group_name = arg_group_name; group.group_priority = arg_group_priority; group.job_threads = arg_threads; group.job_memory = arg_memory; group.job_time = arg_time; group.job_tmpdir_size = arg_tmpdir; group.job_gpu = arg_gpu; group.group_blacklist = blacklist; group.group_whitelist = whitelist; group.prerequisites = arg_prerequisites; group.tags = tags; group.job_max_per_node = arg_max_per_node; job.job_flags = arg_jobflags; job.job_priority = arg_priority; job.job_workdir = arg_workdir; job.job_stdout = arg_stdout; job.job_stderr = arg_stderr; job.job_umask = arg_umask; job.job_argc = argc; job.job_argv = argv; job.job_argv_str = arg_args; /******************************************************************/ res = getresuid(&ruid, &euid, &suid); assert(res != -1); passwd = getpwuid(ruid); assert(passwd != NULL); res = getresgid(&rgid, &egid, &sgid); assert(res != -1); grp = getgrgid(rgid); assert(grp != NULL); group.user_uid = ruid; group.user_name = passwd->pw_name; group.user_gid = rgid; group.user_group = grp->gr_name; /******************************************************************/ group.job_command = arg_program_name; job.host_submit = mx_hostname(); /******************************************************************/ res = mx_mysql_initialize(&mysql); assert(res == 0); mx_mysql_option_set_default_file(mysql, arg_mysql_default_file); mx_mysql_option_set_default_group(mysql, arg_mysql_default_group); res = mx_mysql_connect_forever(&mysql); assert(res == 0); mx_log_info("MySQL: Connection to database established."); res = mxq_submit_task(mysql, &job, flags, arg_groupid); mx_mysql_finish(&mysql); mx_log_info("MySQL: Connection to database closed."); if (res < 0) { if (res != -ENOENT && res != -EDQUOT) mx_log_err("Job submission failed: %m"); return 1; } printf("mxq_group_id=%" PRIu64 " \n", group.group_id); printf("mxq_group_name=%s\n", group.group_name); printf("mxq_job_id=%" PRIu64 "\n", job.job_id); return 0; }