diff --git a/mxqd.c b/mxqd.c index d8fd333d..1e827d0f 100644 --- a/mxqd.c +++ b/mxqd.c @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -51,6 +52,9 @@ # define MXQ_INITIAL_TMPDIR "/tmp" #endif +#define MXQ_JOB_TMPDIR_MNTDIR "/dev/shm/mxqd/mnt/job" +#define MXQ_JOB_TMPDIR_FS "/scratch/local2" + #define RUNNING_AS_ROOT (getuid() == 0) static int global_sigint_cnt=0; @@ -771,7 +775,6 @@ static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job) mx_setenv_forever("USERNAME", group->user_name); mx_setenv_forever("LOGNAME", group->user_name); mx_setenv_forever("PATH", server->initial_path); - mx_setenv_forever("TMPDIR", server->initial_tmpdir); mx_setenv_forever("PWD", job->job_workdir); mx_setenv_forever("HOME", passwd->pw_dir); mx_setenv_forever("SHELL", passwd->pw_shell); @@ -785,7 +788,11 @@ static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job) mx_setenv_forever("MXQ_HOSTID", server->host_id); mx_setenv_forever("MXQ_HOSTNAME", server->hostname); mx_setenv_forever("MXQ_SERVERID", server->daemon_name); - + if (group->job_tmpdir_size == 0) { + mx_setenv_forever("TMPDIR", server->initial_tmpdir); + } else { + mx_setenvf_forever("TMPDIR", "%s/%lu", MXQ_JOB_TMPDIR_MNTDIR, job->job_id); + } fh = open("/proc/self/loginuid", O_WRONLY|O_TRUNC); if (fh == -1) { mx_log_err("job=%s(%d):%lu:%lu open(%s) failed: %m", @@ -1148,8 +1155,11 @@ unsigned long start_job(struct mxq_group_list *glist) struct mxq_daemon *daemon; + static char create_job_tmpdir_script[] = LIBEXECDIR "/mxq/create_job_tmpdir"; + pid_t pid; int res; + int status; assert(glist); assert(glist->user); @@ -1167,6 +1177,34 @@ unsigned long start_job(struct mxq_group_list *glist) mx_log_info(" job=%s(%d):%lu:%lu :: new job loaded.", group->user_name, group->user_uid, group->group_id, job->job_id); + + if (group->job_tmpdir_size > 0) { + mx_mysql_disconnect(server->mysql); + pid = fork(); + if (pid==0) { + char *argv[2]; + char *envp[4]; + argv[0] = create_job_tmpdir_script, + argv[1] = NULL; + mx_asprintf_forever(&envp[0], "MXQ_JOBID=%lu", job->job_id); + mx_asprintf_forever(&envp[1], "MXQ_SIZE=%u", group->job_tmpdir_size); + mx_asprintf_forever(&envp[2], "MXQ_UID=%d", group->user_uid); + envp[3] = NULL; + execve(create_job_tmpdir_script,argv,envp); + mx_log_fatal("exec %s : %m",create_job_tmpdir_script); + exit(1); + } + mx_mysql_connect_forever(&(server->mysql)); + if (pid < 0) { + mx_log_err("fork: %m"); + return(0); + } + waitpid(pid, &status, 0); + if (status) { + return 0; + } + } + cpuset_init_job(&job->host_cpu_set, &server->cpu_set_available, &server->cpu_set_running, glist->slots_per_job); mx_free_null(job->host_cpu_set_str); job->host_cpu_set_str = mx_cpuset_to_str(&job->host_cpu_set); @@ -1245,6 +1283,8 @@ unsigned long start_user(struct mxq_user_list *ulist, int job_limit, long slots_ unsigned long slots_started = 0; int jobs_started = 0; + unsigned long df_scratch; + assert(ulist); assert(ulist->server); assert(ulist->groups); @@ -1260,6 +1300,8 @@ unsigned long start_user(struct mxq_user_list *ulist, int job_limit, long slots_ mx_log_debug(" user=%s(%d) slots_to_start=%ld job_limit=%d :: trying to start jobs for user.", group->user_name, group->user_uid, slots_to_start, job_limit); + df_scratch=mx_df(MXQ_JOB_TMPDIR_FS "/."); + for (glist = ulist->groups; glist && slots_to_start > 0 && (!job_limit || jobs_started < job_limit); glist = gnext) { group = &glist->group; @@ -1281,6 +1323,10 @@ unsigned long start_user(struct mxq_user_list *ulist, int job_limit, long slots_ goto start_user_continue; } + if (df_scratch/1024/1024/1024 < group->job_tmpdir_size + 20) { + goto start_user_continue; + } + if (group->group_priority < prio) { if (started) { goto start_user_rewind; @@ -1760,6 +1806,33 @@ static void rename_outfiles(struct mxq_server *server, struct mxq_group *group, } } +static char *job_tmpdir_path(unsigned long job_id) { + char *pathname; + mx_asprintf_forever(&pathname, "%s/%lu", MXQ_JOB_TMPDIR_MNTDIR, job_id); + return pathname; +} + +static int unmount_and_remove(char *pathname) { + int res; + res = rmdir(pathname); + if (res && errno==EBUSY) { + res = umount(pathname); + if (res == 0) { + res = rmdir(pathname); + } + } + return res; +} + +static void unmount_job_tmpdir(unsigned long job_id) { + char *pathname; + pathname=job_tmpdir_path(job_id); + if (unmount_and_remove(pathname)) { + mx_log_warning("failed to unmount/remove stale job tmpdir %s: %m", pathname); + } + free(pathname); +} + static int job_has_finished(struct mxq_server *server, struct mxq_group *group, struct mxq_job_list *jlist) { int cnt; @@ -1767,6 +1840,8 @@ static int job_has_finished(struct mxq_server *server, struct mxq_group *group, job=&jlist->job; + unmount_job_tmpdir(job->job_id); + mxq_set_job_status_exited(server->mysql, job); rename_outfiles(server, group, job); @@ -2227,6 +2302,52 @@ int load_running_groups(struct mxq_server *server) return total; } +static int job_mountdirs_is_valid_name_parse(const char *name, unsigned long int *job_id) { + const char *c=name; + if (!*c) + return 0; + if (!isdigit(*c++)) + return 0; + while(isdigit(*c)) { + c++; + } + if (*c) { + return 0; + } + if (job_id) { + *job_id = strtoul(name, NULL, 10); + } + return 1; +} + +static int job_mountdirs_is_valid_name(const struct dirent *d) +{ + return job_mountdirs_is_valid_name_parse(d->d_name,NULL); +} + +static void server_umount_stale_job_mountdirs(struct mxq_server *server) { + + int entries; + struct dirent **namelist; + unsigned long int job_id; + int i; + + entries=scandir(MXQ_JOB_TMPDIR_MNTDIR,&namelist,&job_mountdirs_is_valid_name,&alphasort); + if (entries<0) { + mx_log_err("scandir %s: %m", MXQ_JOB_TMPDIR_MNTDIR); + return; + } + for (i=0;id_name, &job_id)) { + if (server_get_job_list_by_job_id(server, job_id) == NULL) { + unmount_job_tmpdir(job_id); + } + } + free(namelist[i]); + } + free(namelist); +} + int recover_from_previous_crash(struct mxq_server *server) { assert(server); @@ -2283,6 +2404,8 @@ int recover_from_previous_crash(struct mxq_server *server) if (res < 0) mx_log_err("recover: failed to update daemon instance statistics: %m"); + server_umount_stale_job_mountdirs(server); + return res; }