Skip to content

Commit

Permalink
Merge pull request #79 from mariux64/job-tmpdir
Browse files Browse the repository at this point in the history
Add `mxqsub --tmpdir` feature
  • Loading branch information
donald authored Jan 29, 2020
2 parents fad0a72 + babc019 commit 7a601af
Show file tree
Hide file tree
Showing 15 changed files with 313 additions and 66 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,11 @@ build: mxqps

clean: CLEAN += mxqps

### script helper -----------------------------------------------------

install:: helper/create_job_tmpdir
$(call quiet-install,0755,$^,${DESTDIR}${LIBEXECDIR}/mxq/create_job_tmpdir)

########################################################################

fix: FIX += mxqdctl-hostconfig.sh
Expand Down
42 changes: 42 additions & 0 deletions helper/create_job_tmpdir
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#! /usr/bin/bash

# Input (environment):
#
# MXQ_JOBID : job ident
# MXQ_SIZE : size in GB
# MXQ_UID : uid

# Output:
#
# /dev/shm/mxqd/tmp/$JOBID mounted, space from /scratch/local2

tmpdir=/scratch/local2/mxqd/tmp
mntdir=/dev/shm/mxqd/mnt/job
filename=$tmpdir/$MXQ_JOBID.tmp
mountpoint=$mntdir/$MXQ_JOBID

umask 006
mkdir -p $tmpdir
mkdir -p $mntdir

status=1;

if fallocate -l ${MXQ_SIZE}G $filename; then
if loopdevice=$(losetup --find --show $filename); then
if mkfs.ext4 \
-q \
-m 0 \
-E nodiscard,mmp_update_interval=300,lazy_journal_init=1,root_owner=$MXQ_UID:0 \
-O '64bit,ext_attr,filetype,^has_journal,huge_file,inline_data,^mmp,^quota,sparse_super2' \
$loopdevice \
&& mkdir -p $mountpoint && mount -Odata=writeback,barrier=0 $loopdevice $mountpoint; then
rmdir $mountpoint/lost+found
status=0
fi
losetup -d $loopdevice
fi
rm $filename
else
test -e $fileame && rm $filename
fi
exit $status
12 changes: 12 additions & 0 deletions mx_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/vfs.h>
#include <fcntl.h>

#include "mx_log.h"
Expand Down Expand Up @@ -1331,3 +1332,14 @@ void _mx_sort_linked_list (void **list, int (*cmp)(void *o1,void *o2), void **
}
*list=sorted;
}

unsigned long mx_df(const char *path) {
int res;
struct statfs s;

res=statfs(path, &s);
if (res<0) {
return 0;
}
return s.f_bavail*s.f_frsize;
}
1 change: 1 addition & 0 deletions mx_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,6 @@ int mx_daemon(int nochdir, int noclose);
void _mx_sort_linked_list(void **list, int (*cmp)(void *o1,void *o2), void ** (*getnextptr)(void *o));
#define mx_sort_linked_list(list,cmp,getnextptr) _mx_sort_linked_list((void **)(list),(int (*)(void *,void *))(cmp),(void ** (*)(void *))(getnextptr))

unsigned long mx_df(const char *path);

#endif
4 changes: 3 additions & 1 deletion mxq_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include "mx_util.h"
#include "mx_mysql.h"

#define GROUP_FIELDS_CNT 32
#define GROUP_FIELDS_CNT 33
#define GROUP_FIELDS \
" group_id," \
" group_name," \
Expand All @@ -27,6 +27,7 @@
" job_threads," \
" job_memory," \
" job_time," \
" job_tmpdir_size," \
" job_max_per_node," \
" group_jobs," \
" group_jobs_inq," \
Expand Down Expand Up @@ -72,6 +73,7 @@ static int bind_result_group_fields(struct mx_mysql_bind *result, struct mxq_gro
res += mx_mysql_bind_var(result, idx++, uint16, &(g->job_threads));
res += mx_mysql_bind_var(result, idx++, uint64, &(g->job_memory));
res += mx_mysql_bind_var(result, idx++, uint32, &(g->job_time));
res += mx_mysql_bind_var(result, idx++, uint32, &(g->job_tmpdir_size));

res += mx_mysql_bind_var(result, idx++, uint16, &(g->job_max_per_node));

Expand Down
1 change: 1 addition & 0 deletions mxq_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct mxq_group {
uint16_t job_threads;
uint64_t job_memory;
uint32_t job_time;
uint32_t job_tmpdir_size; /* GB */

uint16_t job_max_per_node;

Expand Down
41 changes: 41 additions & 0 deletions mxq_job.c
Original file line number Diff line number Diff line change
Expand Up @@ -801,3 +801,44 @@ int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **job

return res;
}

int mxq_unload_job_from_server(struct mx_mysql *mysql, struct mxq_daemon *daemon, uint64_t job_id) {

/* set a job from LOADED back to INQ. This needs to reset what
* mxq_assign_job_from_group_to_daemon() and mxq_set_job_status_loaded_on_server()
* did to the job:
*
* mxq_assign_job_from_group_to_daemon() : daemon_id, host_hostname, host_slots, server_id, job_status
* mxq_set_job_status_loaded_on_server() : host_id, job_status
*
* Only to be used as an error path, if we fail after loading a job during job setup
* before any users code was executed (with possible user-visible side effects)
*/

struct mx_mysql_bind param = {0};
int res;

char *query =
"UPDATE"
" mxq_job"
" SET"
" daemon_id = 0,"
" host_hostname = '',"
" host_slots = 0,"
" server_id = '',"
" host_id = '',"
" job_status = " status_str(MXQ_JOB_STATUS_INQ)
" WHERE"
" job_id = ?"
" AND job_status = " status_str(MXQ_JOB_STATUS_LOADED);

mx_mysql_bind_init_param(&param, 1);
mx_mysql_bind_var(&param, 0, uint64, &(job_id));

res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, &param);
if (res < 0) {
mx_log_err("mx_mysql_do_statement(): %m");
return res;
}
return res;
}
1 change: 1 addition & 0 deletions mxq_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,5 @@ int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j);
int mxq_load_job_from_group_assigned_to_daemon(struct mx_mysql *mysql, struct mxq_job **jobs_result, uint64_t group_id, struct mxq_daemon *daemon);
int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, struct mxq_daemon *daemon,unsigned long slots_per_job);
int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **jobs_result, struct mxq_daemon *daemon);
int mxq_unload_job_from_server(struct mx_mysql *mysql, struct mxq_daemon *daemon, uint64_t job_id);
#endif
Loading

0 comments on commit 7a601af

Please sign in to comment.