diff --git a/Makefile b/Makefile index 3020bb79..30a8f5d8 100644 --- a/Makefile +++ b/Makefile @@ -596,6 +596,11 @@ build: mxqps clean: CLEAN += mxqps +### script helper ----------------------------------------------------- + +install:: helper/create_job_tmpdir + $(call quiet-install,0755,$^,${DESTDIR}${LIBEXECDIR}/mxq/create_job_tmpdir) + ######################################################################## fix: FIX += mxqdctl-hostconfig.sh diff --git a/helper/create_job_tmpdir b/helper/create_job_tmpdir new file mode 100755 index 00000000..3d74e3e1 --- /dev/null +++ b/helper/create_job_tmpdir @@ -0,0 +1,42 @@ +#! /usr/bin/bash + +# Input (environment): +# +# MXQ_JOBID : job ident +# MXQ_SIZE : size in GB +# MXQ_UID : uid + +# Output: +# +# /dev/shm/mxqd/tmp/$JOBID mounted, space from /scratch/local2 + +tmpdir=/scratch/local2/mxqd/tmp +mntdir=/dev/shm/mxqd/mnt/job +filename=$tmpdir/$MXQ_JOBID.tmp +mountpoint=$mntdir/$MXQ_JOBID + +umask 006 +mkdir -p $tmpdir +mkdir -p $mntdir + +status=1; + +if fallocate -l ${MXQ_SIZE}G $filename; then + if loopdevice=$(losetup --find --show $filename); then + if mkfs.ext4 \ + -q \ + -m 0 \ + -E nodiscard,mmp_update_interval=300,lazy_journal_init=1,root_owner=$MXQ_UID:0 \ + -O '64bit,ext_attr,filetype,^has_journal,huge_file,inline_data,^mmp,^quota,sparse_super2' \ + $loopdevice \ + && mkdir -p $mountpoint && mount -Odata=writeback,barrier=0 $loopdevice $mountpoint; then + rmdir $mountpoint/lost+found + status=0 + fi + losetup -d $loopdevice + fi + rm $filename +else + test -e $fileame && rm $filename +fi +exit $status diff --git a/mx_util.c b/mx_util.c index 0dd65101..b686ba11 100644 --- a/mx_util.c +++ b/mx_util.c @@ -13,6 +13,7 @@ #include #include +#include #include #include "mx_log.h" @@ -1331,3 +1332,14 @@ void _mx_sort_linked_list (void **list, int (*cmp)(void *o1,void *o2), void ** } *list=sorted; } + +unsigned long mx_df(const char *path) { + int res; + struct statfs s; + + res=statfs(path, &s); + if (res<0) { + return 0; + } + return s.f_bavail*s.f_frsize; +} diff --git a/mx_util.h b/mx_util.h index 46107414..bf31f4ee 100644 --- a/mx_util.h +++ b/mx_util.h @@ -169,5 +169,6 @@ int mx_daemon(int nochdir, int noclose); void _mx_sort_linked_list(void **list, int (*cmp)(void *o1,void *o2), void ** (*getnextptr)(void *o)); #define mx_sort_linked_list(list,cmp,getnextptr) _mx_sort_linked_list((void **)(list),(int (*)(void *,void *))(cmp),(void ** (*)(void *))(getnextptr)) +unsigned long mx_df(const char *path); #endif diff --git a/mxq_group.c b/mxq_group.c index 3f2ce487..34f8c7ac 100644 --- a/mxq_group.c +++ b/mxq_group.c @@ -12,7 +12,7 @@ #include "mx_util.h" #include "mx_mysql.h" -#define GROUP_FIELDS_CNT 32 +#define GROUP_FIELDS_CNT 33 #define GROUP_FIELDS \ " group_id," \ " group_name," \ @@ -27,6 +27,7 @@ " job_threads," \ " job_memory," \ " job_time," \ + " job_tmpdir_size," \ " job_max_per_node," \ " group_jobs," \ " group_jobs_inq," \ @@ -72,6 +73,7 @@ static int bind_result_group_fields(struct mx_mysql_bind *result, struct mxq_gro res += mx_mysql_bind_var(result, idx++, uint16, &(g->job_threads)); res += mx_mysql_bind_var(result, idx++, uint64, &(g->job_memory)); res += mx_mysql_bind_var(result, idx++, uint32, &(g->job_time)); + res += mx_mysql_bind_var(result, idx++, uint32, &(g->job_tmpdir_size)); res += mx_mysql_bind_var(result, idx++, uint16, &(g->job_max_per_node)); diff --git a/mxq_group.h b/mxq_group.h index 1c8e0c5a..762b04ae 100644 --- a/mxq_group.h +++ b/mxq_group.h @@ -26,6 +26,7 @@ struct mxq_group { uint16_t job_threads; uint64_t job_memory; uint32_t job_time; + uint32_t job_tmpdir_size; /* GB */ uint16_t job_max_per_node; diff --git a/mxq_job.c b/mxq_job.c index 42a5dc02..ee3b3918 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -801,3 +801,44 @@ int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **job return res; } + +int mxq_unload_job_from_server(struct mx_mysql *mysql, struct mxq_daemon *daemon, uint64_t job_id) { + + /* set a job from LOADED back to INQ. This needs to reset what + * mxq_assign_job_from_group_to_daemon() and mxq_set_job_status_loaded_on_server() + * did to the job: + * + * mxq_assign_job_from_group_to_daemon() : daemon_id, host_hostname, host_slots, server_id, job_status + * mxq_set_job_status_loaded_on_server() : host_id, job_status + * + * Only to be used as an error path, if we fail after loading a job during job setup + * before any users code was executed (with possible user-visible side effects) + */ + + struct mx_mysql_bind param = {0}; + int res; + + char *query = + "UPDATE" + " mxq_job" + " SET" + " daemon_id = 0," + " host_hostname = ''," + " host_slots = 0," + " server_id = ''," + " host_id = ''," + " job_status = " status_str(MXQ_JOB_STATUS_INQ) + " WHERE" + " job_id = ?" + " AND job_status = " status_str(MXQ_JOB_STATUS_LOADED); + + mx_mysql_bind_init_param(¶m, 1); + mx_mysql_bind_var(¶m, 0, uint64, &(job_id)); + + res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, ¶m); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + return res; +} diff --git a/mxq_job.h b/mxq_job.h index dd5c9c80..a08aa4ad 100644 --- a/mxq_job.h +++ b/mxq_job.h @@ -96,4 +96,5 @@ int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j); int mxq_load_job_from_group_assigned_to_daemon(struct mx_mysql *mysql, struct mxq_job **jobs_result, uint64_t group_id, struct mxq_daemon *daemon); int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, struct mxq_daemon *daemon,unsigned long slots_per_job); int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **jobs_result, struct mxq_daemon *daemon); +int mxq_unload_job_from_server(struct mx_mysql *mysql, struct mxq_daemon *daemon, uint64_t job_id); #endif diff --git a/mxqd.c b/mxqd.c index d8fd333d..412e43e3 100644 --- a/mxqd.c +++ b/mxqd.c @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -51,6 +52,9 @@ # define MXQ_INITIAL_TMPDIR "/tmp" #endif +#define MXQ_JOB_TMPDIR_MNTDIR "/dev/shm/mxqd/mnt/job" +#define MXQ_JOB_TMPDIR_FS "/scratch/local2" + #define RUNNING_AS_ROOT (getuid() == 0) static int global_sigint_cnt=0; @@ -771,7 +775,6 @@ static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job) mx_setenv_forever("USERNAME", group->user_name); mx_setenv_forever("LOGNAME", group->user_name); mx_setenv_forever("PATH", server->initial_path); - mx_setenv_forever("TMPDIR", server->initial_tmpdir); mx_setenv_forever("PWD", job->job_workdir); mx_setenv_forever("HOME", passwd->pw_dir); mx_setenv_forever("SHELL", passwd->pw_shell); @@ -785,7 +788,11 @@ static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job) mx_setenv_forever("MXQ_HOSTID", server->host_id); mx_setenv_forever("MXQ_HOSTNAME", server->hostname); mx_setenv_forever("MXQ_SERVERID", server->daemon_name); - + if (group->job_tmpdir_size == 0) { + mx_setenv_forever("TMPDIR", server->initial_tmpdir); + } else { + mx_setenvf_forever("TMPDIR", "%s/%lu", MXQ_JOB_TMPDIR_MNTDIR, job->job_id); + } fh = open("/proc/self/loginuid", O_WRONLY|O_TRUNC); if (fh == -1) { mx_log_err("job=%s(%d):%lu:%lu open(%s) failed: %m", @@ -1148,8 +1155,11 @@ unsigned long start_job(struct mxq_group_list *glist) struct mxq_daemon *daemon; + static char create_job_tmpdir_script[] = LIBEXECDIR "/mxq/create_job_tmpdir"; + pid_t pid; int res; + int status; assert(glist); assert(glist->user); @@ -1167,6 +1177,35 @@ unsigned long start_job(struct mxq_group_list *glist) mx_log_info(" job=%s(%d):%lu:%lu :: new job loaded.", group->user_name, group->user_uid, group->group_id, job->job_id); + if (group->job_tmpdir_size > 0) { + mx_mysql_disconnect(server->mysql); + pid = fork(); + if (pid==0) { + char *argv[2]; + char *envp[4]; + argv[0] = create_job_tmpdir_script, + argv[1] = NULL; + mx_asprintf_forever(&envp[0], "MXQ_JOBID=%lu", job->job_id); + mx_asprintf_forever(&envp[1], "MXQ_SIZE=%u", group->job_tmpdir_size); + mx_asprintf_forever(&envp[2], "MXQ_UID=%d", group->user_uid); + envp[3] = NULL; + execve(create_job_tmpdir_script,argv,envp); + mx_log_fatal("exec %s : %m",create_job_tmpdir_script); + exit(1); + } + mx_mysql_connect_forever(&(server->mysql)); + if (pid < 0) { + mx_log_err("fork: %m"); + mxq_unload_job_from_server(server->mysql, daemon, job->job_id); + return(0); + } + waitpid(pid, &status, 0); + if (status) { + mxq_unload_job_from_server(server->mysql, daemon, job->job_id); + return 0; + } + } + cpuset_init_job(&job->host_cpu_set, &server->cpu_set_available, &server->cpu_set_running, glist->slots_per_job); mx_free_null(job->host_cpu_set_str); job->host_cpu_set_str = mx_cpuset_to_str(&job->host_cpu_set); @@ -1179,6 +1218,7 @@ unsigned long start_job(struct mxq_group_list *glist) if (pid < 0) { mx_log_err("fork: %m"); cpuset_clear_running(&job->host_cpu_set,&server->cpu_set_available); + mxq_unload_job_from_server(server->mysql, daemon, job->job_id); return 0; } else if (pid == 0) { job->host_pid = getpid(); @@ -1233,17 +1273,13 @@ unsigned long start_job(struct mxq_group_list *glist) /**********************************************************************/ -unsigned long start_user(struct mxq_user_list *ulist, int job_limit, long slots_to_start) +unsigned long start_user(struct mxq_user_list *ulist, long slots_to_start) { struct mxq_server *server; struct mxq_group_list *glist; - struct mxq_group_list *gnext = NULL; struct mxq_group *group; - unsigned int prio; - unsigned char started = 0; - unsigned long slots_started = 0; - int jobs_started = 0; + unsigned long df_scratch; assert(ulist); assert(ulist->server); @@ -1253,61 +1289,42 @@ unsigned long start_user(struct mxq_user_list *ulist, int job_limit, long slots_ glist = ulist->groups; group = &glist->group; - prio = group->group_priority; - assert(slots_to_start <= server->slots - server->slots_running); - mx_log_debug(" user=%s(%d) slots_to_start=%ld job_limit=%d :: trying to start jobs for user.", - group->user_name, group->user_uid, slots_to_start, job_limit); + mx_log_debug(" user=%s(%d) slots_to_start=%ld :: trying to start jobs for user.", + group->user_name, group->user_uid, slots_to_start); - for (glist = ulist->groups; glist && slots_to_start > 0 && (!job_limit || jobs_started < job_limit); glist = gnext) { + df_scratch=mx_df(MXQ_JOB_TMPDIR_FS "/."); - group = &glist->group; - gnext = glist->next; + for (glist = ulist->groups; glist ; glist = glist->next) { - assert(glist->jobs_running <= group->group_jobs); - assert(glist->jobs_running <= glist->jobs_max); + group = &glist->group; if (glist->jobs_running == group->group_jobs) { - goto start_user_continue; + continue; } if (glist->jobs_running == glist->jobs_max) { - goto start_user_continue; + continue; } if (mxq_group_jobs_inq(group) == 0) { - goto start_user_continue; + continue; } if (glist->slots_per_job > slots_to_start) { - goto start_user_continue; + continue; } - - if (group->group_priority < prio) { - if (started) { - goto start_user_rewind; - } - prio = group->group_priority; + if (df_scratch/1024/1024/1024 < group->job_tmpdir_size + 20) { + continue; } + mx_log_info(" group=%s(%d):%lu slots_to_start=%ld slots_per_job=%lu :: trying to start job for group.", group->user_name, group->user_uid, group->group_id, slots_to_start, glist->slots_per_job); if (start_job(glist)) { - slots_to_start -= glist->slots_per_job; - jobs_started++; - slots_started += glist->slots_per_job; - - started = 1; + int slots_started = glist->slots_per_job; + return slots_started; } - -start_user_continue: - if (gnext || !started) - continue; - -start_user_rewind: - gnext = ulist->groups; - started = 0; - continue; } - return slots_started; + return 0; } /**********************************************************************/ @@ -1342,7 +1359,7 @@ long start_user_with_least_running_global_slot_count(struct mxq_server *server) if (waiting && ulist->global_slots_running > global_slots_per_user) return -1; - slots_started = start_user(ulist, 1, slots_free); + slots_started = start_user(ulist, slots_free); if (slots_started) return slots_started; @@ -1760,6 +1777,33 @@ static void rename_outfiles(struct mxq_server *server, struct mxq_group *group, } } +static char *job_tmpdir_path(unsigned long job_id) { + char *pathname; + mx_asprintf_forever(&pathname, "%s/%lu", MXQ_JOB_TMPDIR_MNTDIR, job_id); + return pathname; +} + +static int unmount_and_remove(char *pathname) { + int res; + res = rmdir(pathname); + if (res && errno==EBUSY) { + res = umount(pathname); + if (res == 0) { + res = rmdir(pathname); + } + } + return res; +} + +static void unmount_job_tmpdir(unsigned long job_id) { + char *pathname; + pathname=job_tmpdir_path(job_id); + if (unmount_and_remove(pathname)) { + mx_log_warning("failed to unmount/remove stale job tmpdir %s: %m", pathname); + } + free(pathname); +} + static int job_has_finished(struct mxq_server *server, struct mxq_group *group, struct mxq_job_list *jlist) { int cnt; @@ -1767,6 +1811,8 @@ static int job_has_finished(struct mxq_server *server, struct mxq_group *group, job=&jlist->job; + unmount_job_tmpdir(job->job_id); + mxq_set_job_status_exited(server->mysql, job); rename_outfiles(server, group, job); @@ -2227,6 +2273,52 @@ int load_running_groups(struct mxq_server *server) return total; } +static int job_mountdirs_is_valid_name_parse(const char *name, unsigned long int *job_id) { + const char *c=name; + if (!*c) + return 0; + if (!isdigit(*c++)) + return 0; + while(isdigit(*c)) { + c++; + } + if (*c) { + return 0; + } + if (job_id) { + *job_id = strtoul(name, NULL, 10); + } + return 1; +} + +static int job_mountdirs_is_valid_name(const struct dirent *d) +{ + return job_mountdirs_is_valid_name_parse(d->d_name,NULL); +} + +static void server_umount_stale_job_mountdirs(struct mxq_server *server) { + + int entries; + struct dirent **namelist; + unsigned long int job_id; + int i; + + entries=scandir(MXQ_JOB_TMPDIR_MNTDIR,&namelist,&job_mountdirs_is_valid_name,&alphasort); + if (entries<0) { + mx_log_err("scandir %s: %m", MXQ_JOB_TMPDIR_MNTDIR); + return; + } + for (i=0;id_name, &job_id)) { + if (server_get_job_list_by_job_id(server, job_id) == NULL) { + unmount_job_tmpdir(job_id); + } + } + free(namelist[i]); + } + free(namelist); +} + int recover_from_previous_crash(struct mxq_server *server) { assert(server); @@ -2283,6 +2375,8 @@ int recover_from_previous_crash(struct mxq_server *server) if (res < 0) mx_log_err("recover: failed to update daemon instance statistics: %m"); + server_umount_stale_job_mountdirs(server); + return res; } diff --git a/mxqdump.c b/mxqdump.c index f88c16af..8a39e9e2 100644 --- a/mxqdump.c +++ b/mxqdump.c @@ -123,6 +123,7 @@ static int print_group(struct mxq_group *g) " job_threads=%u" " job_memory=%lukiB" " job_time=%us" + " job_tmpdir_size=%uGB" " memory_load=%lu%%" " time_load=%lu%%" " max_utime=%lu" @@ -151,6 +152,7 @@ static int print_group(struct mxq_group *g) g->job_threads, g->job_memory*1024, g->job_time*60, + g->job_tmpdir_size, (100UL*(uint64_t)g->stats_max_sumrss/1024UL/g->job_memory), (100UL*(uint64_t)g->stats_max_real.tv_sec/60UL/g->job_time), g->stats_max_utime.tv_sec, diff --git a/mxqsub.c b/mxqsub.c index fb15cb2e..f9c62b05 100644 --- a/mxqsub.c +++ b/mxqsub.c @@ -71,6 +71,7 @@ static void print_usage(void) "\n" " -j, --threads=NUMBER set number of threads (default: 1)\n" " -m, --memory=SIZE set amount of memory (default: 2G)\n" + " --tmpdir=SIZE set size of TMPDIR (default: 0)\n" "\n" " [SIZE] may be suffixed with a combination of T, G and M\n" " to specify tebibytes, gibibytes and mebibytes.\n" @@ -164,6 +165,7 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g) " AND job_threads = ?" " AND job_memory = ?" " AND job_time = ?" + " AND job_tmpdir_size = ?" " AND job_max_per_node = ?" " AND group_priority = ?" " AND group_status = 0" @@ -184,9 +186,10 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g) res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads)); res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory)); res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time)); - res += mx_mysql_statement_param_bind(stmt, 9, uint16, &(g->job_max_per_node)); - res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->group_priority)); - res += mx_mysql_statement_param_bind(stmt, 11, uint64, &(flags)); + res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size)); + res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->job_max_per_node)); + res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority)); + res += mx_mysql_statement_param_bind(stmt, 12, uint64, &(flags)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); @@ -247,6 +250,7 @@ static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g " AND job_threads = ?" " AND job_memory = ?" " AND job_time = ?" + " AND job_tmpdir_size = ?" " AND job_max_per_node = ?" " AND group_priority = ?" " AND group_status = 0" @@ -268,10 +272,11 @@ static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads)); res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory)); res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time)); - res += mx_mysql_statement_param_bind(stmt, 9, uint16, &(g->job_max_per_node)); - res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->group_priority)); - res += mx_mysql_statement_param_bind(stmt, 11, uint64, &(g->group_id)); - res += mx_mysql_statement_param_bind(stmt, 12, uint64, &(flags)); + res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size)); + res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->job_max_per_node)); + res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority)); + res += mx_mysql_statement_param_bind(stmt, 12, uint64, &(g->group_id)); + res += mx_mysql_statement_param_bind(stmt, 13, uint64, &(flags)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); @@ -324,6 +329,7 @@ static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g " AND job_threads = ?" " AND job_memory = ?" " AND job_time = ?" + " AND job_tmpdir_size = ?" " AND job_max_per_node = ?" " AND group_priority = ?" " AND group_status = 0" @@ -349,9 +355,10 @@ static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads)); res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory)); res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time)); - res += mx_mysql_statement_param_bind(stmt, 9, uint16, &(g->job_max_per_node)); - res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->group_priority)); - res += mx_mysql_statement_param_bind(stmt, 11, uint64, &(flags)); + res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size)); + res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->job_max_per_node)); + res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority)); + res += mx_mysql_statement_param_bind(stmt, 12, uint64, &(flags)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); @@ -406,6 +413,7 @@ static int add_group(struct mx_mysql *mysql, struct mxq_group *g) " job_threads = ?," " job_memory = ?," " job_time = ?," + " job_tmpdir_size = ?," " job_max_per_node = ?," " group_priority = ?"); if (!stmt) { @@ -422,8 +430,9 @@ static int add_group(struct mx_mysql *mysql, struct mxq_group *g) res += mx_mysql_statement_param_bind(stmt, 6, uint16, &(g->job_threads)); res += mx_mysql_statement_param_bind(stmt, 7, uint64, &(g->job_memory)); res += mx_mysql_statement_param_bind(stmt, 8, uint32, &(g->job_time)); - res += mx_mysql_statement_param_bind(stmt, 9, uint16, &(g->job_max_per_node)); - res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->group_priority)); + res += mx_mysql_statement_param_bind(stmt, 9, uint32, &(g->job_tmpdir_size)); + res += mx_mysql_statement_param_bind(stmt,10, uint16, &(g->job_max_per_node)); + res += mx_mysql_statement_param_bind(stmt,11, uint16, &(g->group_priority)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); @@ -644,6 +653,7 @@ int main(int argc, char *argv[]) char *arg_mysql_default_group; char arg_debug; char arg_jobflags; + u_int32_t arg_tmpdir; _mx_cleanup_free_ char *current_workdir = NULL; _mx_cleanup_free_ char *arg_stdout_absolute = NULL; @@ -702,6 +712,7 @@ int main(int argc, char *argv[]) MX_OPTION_OPTIONAL_ARG("mysql-default-file", 'M'), MX_OPTION_OPTIONAL_ARG("mysql-default-group", 'S'), + MX_OPTION_REQUIRED_ARG("tmpdir", 7), MX_OPTION_END }; @@ -726,6 +737,7 @@ int main(int argc, char *argv[]) arg_debug = 0; arg_jobflags = 0; arg_groupid = UINT64_UNSET; + arg_tmpdir = 0; arg_mysql_default_group = getenv("MXQ_MYSQL_DEFAULT_GROUP"); if (!arg_mysql_default_group) @@ -919,6 +931,17 @@ int main(int argc, char *argv[]) arg_mysql_default_group = optctl.optarg; break; + case 7: + { + unsigned long long int bytes; + if(mx_strtobytes(optctl.optarg, &bytes) < 0) { + mx_log_crit("--tmpdir '%s': %m", optctl.optarg); + exit(EX_CONFIG); + } + arg_tmpdir=(bytes+1024*1024*1024-1)/1024/1024/1024; + } + break; + } } @@ -984,11 +1007,12 @@ int main(int argc, char *argv[]) /******************************************************************/ - group.group_name = arg_group_name; - group.group_priority = arg_group_priority; - group.job_threads = arg_threads; - group.job_memory = arg_memory; - group.job_time = arg_time; + group.group_name = arg_group_name; + group.group_priority = arg_group_priority; + group.job_threads = arg_threads; + group.job_memory = arg_memory; + group.job_time = arg_time; + group.job_tmpdir_size = arg_tmpdir; group.job_max_per_node = arg_max_per_node; diff --git a/mysql/create_tables.sql b/mysql/create_tables.sql index 8a3f4872..c3c0f9d2 100644 --- a/mysql/create_tables.sql +++ b/mysql/create_tables.sql @@ -15,6 +15,7 @@ CREATE TABLE IF NOT EXISTS mxq_group ( job_threads INT2 UNSIGNED NOT NULL DEFAULT 1, job_memory INT8 UNSIGNED NOT NULL DEFAULT 1024, job_time INT4 UNSIGNED NOT NULL DEFAULT 15, + job_tmpdir_size INT4 UNSIGNED NOT NULL DEFAULT 0, job_max_per_node INT2 UNSIGNED NOT NULL DEFAULT 0, diff --git a/mysql/create_trigger.sql b/mysql/create_trigger.sql index 2f5b0b9f..e4a35dbb 100644 --- a/mysql/create_trigger.sql +++ b/mysql/create_trigger.sql @@ -142,6 +142,16 @@ CREATE TRIGGER mxq_update_job BEFORE UPDATE ON mxq_job stats_total_real_sec_finished = stats_total_real_sec_finished + NEW.stats_real_sec WHERE group_id = NEW.group_id; + -- LOADED (150) -> INQ(0) + ELSEIF NEW.job_status = 0 AND OLD.job_status IN (150) THEN + + UPDATE mxq_group SET + group_sum_starttime = group_sum_starttime - UNIX_TIMESTAMP(OLD.date_start) * NEW.host_slots, + group_jobs_inq = group_jobs_inq + 1, + group_jobs_running = group_jobs_running - 1, + group_slots_running = group_slots_running - OLD.host_slots + WHERE group_id = NEW.group_id; + -- * -> NOT IN [ CANCELLING(989) | CANCELLED(990) ] ELSEIF NEW.job_status NOT IN (989, 990) THEN diff --git a/test_mx_util.c b/test_mx_util.c index 4d4c4f38..54ba7e03 100644 --- a/test_mx_util.c +++ b/test_mx_util.c @@ -530,6 +530,10 @@ static void test_listsort(void) assert(o[9].next==NULL); } +static void test_mx_df() { + assert(mx_df("/") > 0); +} + int main(int argc, char *argv[]) { test_mx_strskipwhitespaces(); @@ -550,5 +554,6 @@ int main(int argc, char *argv[]) test_mx_strcat(); test_mx_cpuset(); test_listsort(); + test_mx_df(); return 0; } diff --git a/web/pages/mxq/mxq.in b/web/pages/mxq/mxq.in index e5ca8bc5..71a166a9 100755 --- a/web/pages/mxq/mxq.in +++ b/web/pages/mxq/mxq.in @@ -380,6 +380,8 @@ sub group_details_raw { my $job_command=escapeHTML($o->{'job_command'}); my $group_status_text=group_status($o->{'group_status'}); + my $job_tmpdir_size = $o->{'job_tmpdir_size'} ? "$o->{'job_tmpdir_size'} GiB" : '-'; + return <<"__EOF__"
 group_name     : $group_name
@@ -392,10 +394,11 @@ user_name      : $o->{user_name}
 user_gid       : $o->{user_gid}
 user_group     : $o->{user_group}
 
-job_command    : $job_command
-job_threads    : $o->{job_threads}
-job_memory     : $o->{job_memory} MiB
-job_time       : $o->{job_time} minutes
+job_command     : $job_command
+job_threads     : $o->{job_threads}
+job_memory      : $o->{job_memory} MiB
+job_time        : $o->{job_time} minutes
+job_tmpdir_size : $job_tmpdir_size
 
 job_max_per_node      : $o->{job_max_per_node}
 
@@ -596,6 +599,7 @@ sub group_table {
 		group_id group_name job_threads
 		job_memory
 		job_time
+		job_tmpdir_size
 		user_name
 		group_status
 		group_jobs
@@ -609,6 +613,7 @@ sub group_table {
 		req.
threads req.
memory req.
runtime + req.
tmpdir user
name group
status jobs @@ -627,7 +632,7 @@ sub group_table { $out.=$q->Tr($q->th(\@head)); while (my $row=$sth->fetchrow_arrayref()) { my ($group_id,$group_name,$job_threads, - $job_memory,$job_time, + $job_memory,$job_time,$job_tmpdir_size, $user_name,$group_status, $group_jobs,$group_jobs_inq,$group_jobs_running,$group_jobs_finished,$group_jobs_failed,$group_jobs_cancelled,$group_jobs_unknown )=@$row; @@ -638,6 +643,7 @@ sub group_table { $q->td({class=>'number'},$job_threads), $q->td({class=>'number'},size($job_memory*1000**2)), $q->td({class=>'number'},days($job_time*60)), + $q->td({class=>'number'},size($job_tmpdir_size*1000*1000*1000)), $q->td($q->a({href=>my_url('groups',{user_name=>$user_name})},$user_name)), $q->td(group_status($group_status)), $q->td({class=>'number'},$group_jobs),