Skip to content

Commit

Permalink
Merge remote-tracking branch 'donald/reaper' into issues/issue30
Browse files Browse the repository at this point in the history
see mariux64#30

* donald/reaper:
  mxqd: reaper: ignore signals from mxqd
  mxqd: set cpu_set_running in group_add_job
  database: store and retrieve cpuset of job
  mxq_job: add a string version of host_cpu_set
  mxq_job: refactor (add do_jobs_statement)
  mxqd: do not finish jobs from signals when we have reaper output
  mxqd: better loglevels for killall_over_time
  mxqd: remove unused member
  mxqd: let reaper call setsid instread of user process
  mxqd: add job_is_lost
  mxqd: do not kill children in catchall
  mxq_job: add mxq_set_job_status_unknown
  mxqd: add SIGQUIT processing : do not kill or wait for children
  mxqd: let recover_from_previous_crash rebuild state for previous jobs
  mxqd: refactor (add reset_signals)
  mxq_job: add mxq_load_jobs_running_on_server
  mxqd: stop recover_from_previous_crash from deleting running jobs
  mxqd: add reaper
  mxqd: add help functions for fspool (finished job spool directory)
  mxqd: create MXQ_FINISHED_JOBSDIR on startup
  make: add FINISHED_JOBSDIR
  mx_util: add mx_mkdir_p
  mxqd: refactor (add job_has_finished)
  mxqd: refactor (add user_process)
  • Loading branch information
mariux committed Nov 3, 2015
2 parents 8fffb79 + f23144e commit 207fb28
Show file tree
Hide file tree
Showing 10 changed files with 789 additions and 212 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,15 @@ MXQ_MYSQL_DEFAULT_GROUP_DEVELOPMENT = mxqdevel
MXQ_INITIAL_PATH = /sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/usr/local/bin
MXQ_INITIAL_TMPDIR = /tmp

MXQ_FINISHED_JOBSDIR = ${LOCALSTATEDIR}/spool/mxqd

CFLAGS_MXQ_MYSQL_DEFAULT_FILE = -DMXQ_MYSQL_DEFAULT_FILE=\"$(MXQ_MYSQL_DEFAULT_FILE)\"
CFLAGS_MXQ_MYSQL_DEFAULT_GROUP = -DMXQ_MYSQL_DEFAULT_GROUP_CLIENT=\"$(MXQ_MYSQL_DEFAULT_GROUP_CLIENT)\"
CFLAGS_MXQ_MYSQL_DEFAULT_GROUP += -DMXQ_MYSQL_DEFAULT_GROUP_SERVER=\"$(MXQ_MYSQL_DEFAULT_GROUP_SERVER)\"
CFLAGS_MXQ_MYSQL_DEFAULT_GROUP += -DMXQ_MYSQL_DEFAULT_GROUP_DEVELOPMENT=\"$(MXQ_MYSQL_DEFAULT_GROUP_DEVELOPMENT)\"
CFLAGS_MXQ_INITIAL_PATH = -DMXQ_INITIAL_PATH=\"$(MXQ_INITIAL_PATH)\"
CFLAGS_MXQ_INITIAL_TMPDIR = -DMXQ_INITIAL_TMPDIR=\"$(MXQ_INITIAL_TMPDIR)\"
CFLAGS_MXQ_FINISHED_JOBSDIR = -DMXQ_FINISHED_JOBSDIR=\"${MXQ_FINISHED_JOBSDIR}\"

MYSQL_CONFIG = mysql_config

Expand Down Expand Up @@ -417,6 +420,7 @@ mxqd.o: $(mx_mysql.h)
mxqd.o: CFLAGS += $(CFLAGS_MYSQL)
mxqd.o: CFLAGS += $(CFLAGS_MXQ_INITIAL_PATH)
mxqd.o: CFLAGS += $(CFLAGS_MXQ_INITIAL_TMPDIR)
mxqd.o: CFLAGS += $(CFLAGS_MXQ_FINISHED_JOBSDIR)
mxqd.o: CFLAGS += -Wno-unused-but-set-variable

clean: CLEAN += mxqd.o
Expand Down
31 changes: 29 additions & 2 deletions mx_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

#include <ctype.h>

//#include <sys/types.h>
//#include <sys/stat.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#include "mx_log.h"
Expand Down Expand Up @@ -1272,3 +1272,30 @@ char *mx_cpuset_to_str(cpu_set_t* cpuset_ptr)
mx_strvec_free(strvec);
return out;
}

int mx_mkdir_p(char *path, mode_t mode)
{
struct stat st;
int err;
char *d;
_mx_cleanup_free_ char *copy = NULL;

if (stat(path, &st) == 0)
return 0;

copy=mx_strdup_forever(path);
d=copy;

while (*++d == '/');

while ((d = strchr(d, '/'))) {
*d = '\0';
err = stat(copy, &st) && mkdir(copy, mode);
*d++ = '/';
if (err)
return -errno;
while (*d == '/')
++d;
}
return (stat(copy, &st) && mkdir(copy, mode)) ? -errno : 0;
}
3 changes: 3 additions & 0 deletions mx_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,7 @@ char* mx_strvec_join(char *sep,char **strvec);
char* mx_cpuset_to_str(cpu_set_t* cpuset_ptr);
int mx_str_to_cpuset(cpu_set_t* cpuset_ptr,char *str);

int mx_mkdir_p(char *path, mode_t mode);


#endif
125 changes: 91 additions & 34 deletions mxq_job.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

#include "mx_util.h"
#include "mx_log.h"
#include "mx_util.h"

#include "mxq_group.h"
#include "mxq_job.h"

#define JOB_FIELDS_CNT 36
#define JOB_FIELDS_CNT 37
#define JOB_FIELDS \
" job_id, " \
" job_status, " \
Expand All @@ -35,6 +36,7 @@
" host_hostname, " \
" host_pid, " \
" host_slots, " \
" host_cpu_set, " \
" UNIX_TIMESTAMP(date_submit) as date_submit, " \
" UNIX_TIMESTAMP(date_start) as date_start, " \
" UNIX_TIMESTAMP(date_end) as date_end, " \
Expand Down Expand Up @@ -80,6 +82,7 @@ static int bind_result_job_fields(struct mx_mysql_bind *result, struct mxq_job *
res += mx_mysql_bind_var(result, idx++, string, &(j->host_hostname));
res += mx_mysql_bind_var(result, idx++, uint32, &(j->host_pid));
res += mx_mysql_bind_var(result, idx++, uint32, &(j->host_slots));
res += mx_mysql_bind_var(result, idx++, string, &(j->host_cpu_set_str));
res += mx_mysql_bind_var(result, idx++, int64, &(j->date_submit));
res += mx_mysql_bind_var(result, idx++, int64, &(j->date_start));
res += mx_mysql_bind_var(result, idx++, int64, &(j->date_end));
Expand Down Expand Up @@ -158,19 +161,37 @@ void mxq_job_free_content(struct mxq_job *j)
mx_free_null(j->tmp_stderr);
mx_free_null(j->host_submit);
mx_free_null(j->host_id);
mx_free_null(j->host_cpu_set_str);
mx_free_null(j->server_id);
mx_free_null(j->host_hostname);
mx_free_null(j->job_argv);
j->job_argv = NULL;
}

static int do_jobs_statement(struct mx_mysql *mysql, char *query, struct mx_mysql_bind *param, struct mxq_job **jobs)
{
int res,i;
struct mxq_job j = {0};
struct mx_mysql_bind result = {0};

res = bind_result_job_fields(&result, &j);
assert(res == 0);

res = mx_mysql_do_statement(mysql, query, param, &result, &j, (void **)jobs, sizeof(**jobs));
if (res < 0) {
mx_log_err("mx_mysql_do_statement(): %m");
return res;
}
for (i=0;i<res;i++)
mx_str_to_cpuset(&(*jobs)[i].host_cpu_set,(*jobs)[i].host_cpu_set_str);
return res;
}

int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job_id)
{
int res;
struct mxq_job *jobs = NULL;
struct mxq_job j = {0};
struct mx_mysql_bind param = {0};
struct mx_mysql_bind result = {0};

assert(mysql);
assert(mxq_jobs);
Expand All @@ -189,26 +210,22 @@ int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job
res = mx_mysql_bind_var(&param, 0, uint64, &job_id);
assert(res == 0);

res = bind_result_job_fields(&result, &j);
assert(res == 0);

res = mx_mysql_do_statement(mysql, query, &param, &result, &j, (void **)&jobs, sizeof(*jobs));
res=do_jobs_statement(mysql, query, &param, &jobs);
if (res < 0) {
mx_log_err("mx_mysql_do_statement(): %m");
return res;
}

*mxq_jobs = jobs;
return res;
}



int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp)
{
int res;
struct mxq_job *jobs = NULL;
struct mxq_job j = {0};
struct mx_mysql_bind param = {0};
struct mx_mysql_bind result = {0};

assert(mysql);
assert(mxq_jobs);
Expand All @@ -227,12 +244,8 @@ int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, st
res = mx_mysql_bind_var(&param, 0, uint64, &(grp->group_id));
assert(res == 0);

res = bind_result_job_fields(&result, &j);
assert(res == 0);

res = mx_mysql_do_statement(mysql, query, &param, &result, &j, (void **)&jobs, sizeof(*jobs));
res=do_jobs_statement(mysql, query, &param, &jobs);
if (res < 0) {
mx_log_err("mx_mysql_do_statement(): %m");
return res;
}

Expand All @@ -244,9 +257,7 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **
{
int res;
struct mxq_job *jobs = NULL;
struct mxq_job j = {0};
struct mx_mysql_bind param = {0};
struct mx_mysql_bind result = {0};

assert(mysql);
assert(mxq_jobs);
Expand All @@ -268,12 +279,8 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **
res += mx_mysql_bind_var(&param, 1, uint64, &job_status);
assert(res == 0);

res = bind_result_job_fields(&result, &j);
assert(res == 0);

res = mx_mysql_do_statement(mysql, query, &param, &result, &j, (void **)&jobs, sizeof(*jobs));
res=do_jobs_statement(mysql, query, &param, &jobs);
if (res < 0) {
mx_log_err("mx_mysql_do_statement(): %m");
return res;
}

Expand Down Expand Up @@ -415,22 +422,24 @@ int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job)
" job_status = " status_str(MXQ_JOB_STATUS_RUNNING) ","
" date_start = NULL,"
" host_pid = ?,"
" host_slots = ?"
" host_slots = ?,"
" host_cpu_set = ?"
" WHERE job_id = ?"
" AND job_status = " status_str(MXQ_JOB_STATUS_LOADED)
" AND host_hostname = ?"
" AND server_id = ?"
" AND host_pid = 0";

res = mx_mysql_bind_init_param(&param, 5);
res = mx_mysql_bind_init_param(&param, 6);
assert(res == 0);

res = 0;
res += mx_mysql_bind_var(&param, 0, uint32, &(job->host_pid));
res += mx_mysql_bind_var(&param, 1, uint32, &(job->host_slots));
res += mx_mysql_bind_var(&param, 2, uint64, &(job->job_id));
res += mx_mysql_bind_var(&param, 3, string, &(job->host_hostname));
res += mx_mysql_bind_var(&param, 4, string, &(job->server_id));
res += mx_mysql_bind_var(&param, 2, string, &(job->host_cpu_set_str));
res += mx_mysql_bind_var(&param, 3, uint64, &(job->job_id));
res += mx_mysql_bind_var(&param, 4, string, &(job->host_hostname));
res += mx_mysql_bind_var(&param, 5, string, &(job->server_id));
assert(res == 0);

res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, &param);
Expand Down Expand Up @@ -573,6 +582,29 @@ int mxq_set_job_status_unknown_for_server(struct mx_mysql *mysql, char *hostname
return res;
}

int mxq_set_job_status_unknown(struct mx_mysql *mysql, struct mxq_job *job)
{
int res;
struct mx_mysql_bind param = {0};

char *query =
"UPDATE mxq_job SET"
" job_status = " status_str(MXQ_JOB_STATUS_UNKNOWN)
" WHERE job_id = ?";

res = mx_mysql_bind_init_param(&param, 1);
res += mx_mysql_bind_var(&param, 0, uint64, &job->job_id);
assert(res == 0);

res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, &param);
if (res < 0) {
mx_log_err("mx_mysql_do_statement(): %m");
return res;
}

return res;
}

int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j)
{
if (!mx_streq(j->job_stdout, "/dev/null")) {
Expand Down Expand Up @@ -605,9 +637,7 @@ int mxq_load_job_from_group_assigned_to_server(struct mx_mysql *mysql, struct mx
{
int res;
struct mxq_job *jobs = NULL;
struct mxq_job j = {0};
struct mx_mysql_bind param = {0};
struct mx_mysql_bind result = {0};

assert(mysql);
assert(mxq_jobs);
Expand Down Expand Up @@ -636,12 +666,8 @@ int mxq_load_job_from_group_assigned_to_server(struct mx_mysql *mysql, struct mx
res += mx_mysql_bind_var(&param, 2, uint64, &group_id);
assert(res == 0);

res = bind_result_job_fields(&result, &j);
assert(res == 0);

res = mx_mysql_do_statement(mysql, query, &param, &result, &j, (void **)&jobs, sizeof(*jobs));
res=do_jobs_statement(mysql, query, &param, &jobs);
if (res < 0) {
mx_log_err("mx_mysql_do_statement(): %m");
return res;
}

Expand Down Expand Up @@ -704,3 +730,34 @@ int mxq_load_job_from_group_for_server(struct mx_mysql *mysql, struct mxq_job *m

return 1;
}

int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, char *hostname, char *server_id)
{
int res;
struct mxq_job *jobs = NULL;

struct mx_mysql_bind param = {0};

char *query =
"SELECT"
JOB_FIELDS
" FROM mxq_job"
" WHERE job_status = " status_str(MXQ_JOB_STATUS_RUNNING)
" AND host_hostname=?"
" AND server_id=?";
res = mx_mysql_bind_init_param(&param, 2);
assert(res == 0);

res=0;
res += mx_mysql_bind_var(&param, 0, string, &hostname);
res += mx_mysql_bind_var(&param, 1, string, &server_id);
assert(res == 0);

res=do_jobs_statement(mysql, query, &param, &jobs);
if (res < 0) {
return res;
}

*mxq_jobs = jobs;
return res;
}
3 changes: 3 additions & 0 deletions mxq_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct mxq_job {
uint32_t host_pid;
uint32_t host_slots;
cpu_set_t host_cpu_set;
char * host_cpu_set_str;

int64_t date_submit;
int64_t date_start;
Expand Down Expand Up @@ -103,9 +104,11 @@ int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, char *hostname, char *se
int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_unknown(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_unknown_for_server(struct mx_mysql *mysql, char *hostname, char *server_id);
int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j);
int mxq_load_job_from_group_assigned_to_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t group_id, char *hostname, char *server_id);
int mxq_load_job_from_group_for_server(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, char *hostname, char *server_id, char *host_id);
int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, char *hostname, char *server_id);

#endif
Loading

0 comments on commit 207fb28

Please sign in to comment.