Skip to content

Commit

Permalink
mxqd: Honor --tmpdir setting
Browse files Browse the repository at this point in the history
Only accept jobs without --tmpdir when we have 20GB free
on MXQ_JOB_TMPDIR_FS or jobs with --tmpdir when we have 20GB more free
than the specified size.

If --tmpdir was given, create and mount a filesystem for the job using
the external helper script `create_job_tmpdir`. The job gets the
environment variable TMPDIR set pointing to that directory.

Add cleanup code to remove the mountpoints and filesystem images after
the job has completed or is lost.
  • Loading branch information
donald committed Jan 29, 2020
1 parent f00d24c commit 91fdfb6
Showing 1 changed file with 125 additions and 2 deletions.
127 changes: 125 additions & 2 deletions mxqd.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <sys/wait.h>
#include <sys/stat.h>
#include <sys/prctl.h>
#include <sys/mount.h>

#include <signal.h>
#include <pwd.h>
Expand Down Expand Up @@ -51,6 +52,9 @@
# define MXQ_INITIAL_TMPDIR "/tmp"
#endif

#define MXQ_JOB_TMPDIR_MNTDIR "/dev/shm/mxqd/mnt/job"
#define MXQ_JOB_TMPDIR_FS "/scratch/local2"

#define RUNNING_AS_ROOT (getuid() == 0)

static int global_sigint_cnt=0;
Expand Down Expand Up @@ -771,7 +775,6 @@ static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job)
mx_setenv_forever("USERNAME", group->user_name);
mx_setenv_forever("LOGNAME", group->user_name);
mx_setenv_forever("PATH", server->initial_path);
mx_setenv_forever("TMPDIR", server->initial_tmpdir);
mx_setenv_forever("PWD", job->job_workdir);
mx_setenv_forever("HOME", passwd->pw_dir);
mx_setenv_forever("SHELL", passwd->pw_shell);
Expand All @@ -785,7 +788,11 @@ static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job)
mx_setenv_forever("MXQ_HOSTID", server->host_id);
mx_setenv_forever("MXQ_HOSTNAME", server->hostname);
mx_setenv_forever("MXQ_SERVERID", server->daemon_name);

if (group->job_tmpdir_size == 0) {
mx_setenv_forever("TMPDIR", server->initial_tmpdir);
} else {
mx_setenvf_forever("TMPDIR", "%s/%lu", MXQ_JOB_TMPDIR_MNTDIR, job->job_id);
}
fh = open("/proc/self/loginuid", O_WRONLY|O_TRUNC);
if (fh == -1) {
mx_log_err("job=%s(%d):%lu:%lu open(%s) failed: %m",
Expand Down Expand Up @@ -1148,8 +1155,11 @@ unsigned long start_job(struct mxq_group_list *glist)

struct mxq_daemon *daemon;

static char create_job_tmpdir_script[] = LIBEXECDIR "/mxq/create_job_tmpdir";

pid_t pid;
int res;
int status;

assert(glist);
assert(glist->user);
Expand All @@ -1167,6 +1177,34 @@ unsigned long start_job(struct mxq_group_list *glist)
mx_log_info(" job=%s(%d):%lu:%lu :: new job loaded.",
group->user_name, group->user_uid, group->group_id, job->job_id);


if (group->job_tmpdir_size > 0) {
mx_mysql_disconnect(server->mysql);
pid = fork();
if (pid==0) {
char *argv[2];
char *envp[4];
argv[0] = create_job_tmpdir_script,
argv[1] = NULL;
mx_asprintf_forever(&envp[0], "MXQ_JOBID=%lu", job->job_id);
mx_asprintf_forever(&envp[1], "MXQ_SIZE=%u", group->job_tmpdir_size);
mx_asprintf_forever(&envp[2], "MXQ_UID=%d", group->user_uid);
envp[3] = NULL;
execve(create_job_tmpdir_script,argv,envp);
mx_log_fatal("exec %s : %m",create_job_tmpdir_script);
exit(1);
}
mx_mysql_connect_forever(&(server->mysql));
if (pid < 0) {
mx_log_err("fork: %m");
return(0);
}
waitpid(pid, &status, 0);
if (status) {
return 0;
}
}

cpuset_init_job(&job->host_cpu_set, &server->cpu_set_available, &server->cpu_set_running, glist->slots_per_job);
mx_free_null(job->host_cpu_set_str);
job->host_cpu_set_str = mx_cpuset_to_str(&job->host_cpu_set);
Expand Down Expand Up @@ -1245,6 +1283,8 @@ unsigned long start_user(struct mxq_user_list *ulist, int job_limit, long slots_
unsigned long slots_started = 0;
int jobs_started = 0;

unsigned long df_scratch;

assert(ulist);
assert(ulist->server);
assert(ulist->groups);
Expand All @@ -1260,6 +1300,8 @@ unsigned long start_user(struct mxq_user_list *ulist, int job_limit, long slots_
mx_log_debug(" user=%s(%d) slots_to_start=%ld job_limit=%d :: trying to start jobs for user.",
group->user_name, group->user_uid, slots_to_start, job_limit);

df_scratch=mx_df(MXQ_JOB_TMPDIR_FS "/.");

for (glist = ulist->groups; glist && slots_to_start > 0 && (!job_limit || jobs_started < job_limit); glist = gnext) {

group = &glist->group;
Expand All @@ -1281,6 +1323,10 @@ unsigned long start_user(struct mxq_user_list *ulist, int job_limit, long slots_
goto start_user_continue;
}

if (df_scratch/1024/1024/1024 < group->job_tmpdir_size + 20) {
goto start_user_continue;
}

if (group->group_priority < prio) {
if (started) {
goto start_user_rewind;
Expand Down Expand Up @@ -1760,13 +1806,42 @@ static void rename_outfiles(struct mxq_server *server, struct mxq_group *group,
}
}

static char *job_tmpdir_path(unsigned long job_id) {
char *pathname;
mx_asprintf_forever(&pathname, "%s/%lu", MXQ_JOB_TMPDIR_MNTDIR, job_id);
return pathname;
}

static int unmount_and_remove(char *pathname) {
int res;
res = rmdir(pathname);
if (res && errno==EBUSY) {
res = umount(pathname);
if (res == 0) {
res = rmdir(pathname);
}
}
return res;
}

static void unmount_job_tmpdir(unsigned long job_id) {
char *pathname;
pathname=job_tmpdir_path(job_id);
if (unmount_and_remove(pathname)) {
mx_log_warning("failed to unmount/remove stale job tmpdir %s: %m", pathname);
}
free(pathname);
}

static int job_has_finished(struct mxq_server *server, struct mxq_group *group, struct mxq_job_list *jlist)
{
int cnt;
struct mxq_job *job;

job=&jlist->job;

unmount_job_tmpdir(job->job_id);

mxq_set_job_status_exited(server->mysql, job);

rename_outfiles(server, group, job);
Expand Down Expand Up @@ -2227,6 +2302,52 @@ int load_running_groups(struct mxq_server *server)
return total;
}

static int job_mountdirs_is_valid_name_parse(const char *name, unsigned long int *job_id) {
const char *c=name;
if (!*c)
return 0;
if (!isdigit(*c++))
return 0;
while(isdigit(*c)) {
c++;
}
if (*c) {
return 0;
}
if (job_id) {
*job_id = strtoul(name, NULL, 10);
}
return 1;
}

static int job_mountdirs_is_valid_name(const struct dirent *d)
{
return job_mountdirs_is_valid_name_parse(d->d_name,NULL);
}

static void server_umount_stale_job_mountdirs(struct mxq_server *server) {

int entries;
struct dirent **namelist;
unsigned long int job_id;
int i;

entries=scandir(MXQ_JOB_TMPDIR_MNTDIR,&namelist,&job_mountdirs_is_valid_name,&alphasort);
if (entries<0) {
mx_log_err("scandir %s: %m", MXQ_JOB_TMPDIR_MNTDIR);
return;
}
for (i=0;i<entries;i++) {
if (job_mountdirs_is_valid_name_parse(namelist[i]->d_name, &job_id)) {
if (server_get_job_list_by_job_id(server, job_id) == NULL) {
unmount_job_tmpdir(job_id);
}
}
free(namelist[i]);
}
free(namelist);
}

int recover_from_previous_crash(struct mxq_server *server)
{
assert(server);
Expand Down Expand Up @@ -2283,6 +2404,8 @@ int recover_from_previous_crash(struct mxq_server *server)
if (res < 0)
mx_log_err("recover: failed to update daemon instance statistics: %m");

server_umount_stale_job_mountdirs(server);

return res;
}

Expand Down

0 comments on commit 91fdfb6

Please sign in to comment.