diff --git a/Makefile b/Makefile index 0e0af167..cd6fda95 100644 --- a/Makefile +++ b/Makefile @@ -392,6 +392,8 @@ mxqdump.o: $(mx_log.h) mxqdump.o: $(mx_util.h) mxqdump.o: $(mx_mysql.h) mxqdump.o: $(mx_getopt.h) +mxqdump.o: $(mxq_group.h) +mxqdump.o: $(mxq_job.h) mxqdump.o: CFLAGS += $(CFLAGS_MYSQL) clean: CLEAN += mxqdump.o diff --git a/mxq_job.c b/mxq_job.c index d71b26e8..32f56067 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -14,10 +14,11 @@ #include "mx_log.h" #include "mx_util.h" +#include "mxq_daemon.h" #include "mxq_group.h" #include "mxq_job.h" -#define JOB_FIELDS_CNT 37 +#define JOB_FIELDS_CNT 38 #define JOB_FIELDS \ " job_id, " \ " job_status, " \ @@ -32,6 +33,7 @@ " job_umask, " \ " host_submit, " \ " host_id, " \ + " daemon_id, " \ " server_id, " \ " host_hostname, " \ " host_pid, " \ @@ -78,7 +80,8 @@ static int bind_result_job_fields(struct mx_mysql_bind *result, struct mxq_job * res += mx_mysql_bind_var(result, idx++, uint32, &(j->job_umask)); res += mx_mysql_bind_var(result, idx++, string, &(j->host_submit)); res += mx_mysql_bind_var(result, idx++, string, &(j->host_id)); - res += mx_mysql_bind_var(result, idx++, string, &(j->server_id)); + res += mx_mysql_bind_var(result, idx++, uint32, &(j->daemon_id)); + res += mx_mysql_bind_var(result, idx++, string, &(j->daemon_name)); res += mx_mysql_bind_var(result, idx++, string, &(j->host_hostname)); res += mx_mysql_bind_var(result, idx++, uint32, &(j->host_pid)); res += mx_mysql_bind_var(result, idx++, uint32, &(j->host_slots)); @@ -162,7 +165,7 @@ void mxq_job_free_content(struct mxq_job *j) mx_free_null(j->tmp_stderr); mx_free_null(j->host_submit); mx_free_null(j->host_id); - mx_free_null(j->server_id); + mx_free_null(j->daemon_name); mx_free_null(j->host_hostname); mx_free_null(j->host_cpu_set_str); } @@ -288,73 +291,89 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job ** return res; } -int mxq_assign_job_from_group_to_server(struct mx_mysql *mysql, uint64_t group_id, char *hostname, char *server_id) +int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon) { - int res; struct mx_mysql_bind param = {0}; + int res; + int idx; assert(mysql); - assert(hostname); - assert(*hostname); - assert(server_id); - assert(*server_id); + assert(daemon); + assert(daemon->hostname); + assert(*daemon->hostname); + assert(daemon->daemon_name); + assert(*daemon->daemon_name); + assert(daemon->daemon_id); char *query = - "UPDATE mxq_job SET" - " job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) "," - " host_hostname = ?," - " server_id = ?" - " WHERE group_id = ?" - " AND job_status = " status_str(MXQ_JOB_STATUS_INQ) - " AND host_hostname = ''" - " AND server_id = ''" - " AND host_pid = 0" - " ORDER BY job_priority, job_id" + "UPDATE " + " mxq_job" + " SET" + " daemon_id = ?," + " host_hostname = ?," + " server_id = ?," + " job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) + " WHERE group_id = ?" + " AND job_status = " status_str(MXQ_JOB_STATUS_INQ) + " AND daemon_id = 0" + " AND host_hostname = ''" + " AND server_id = ''" + " AND host_pid = 0" + " ORDER BY" + " job_priority," + " job_id" " LIMIT 1"; - res = mx_mysql_bind_init_param(¶m, 3); + res = mx_mysql_bind_init_param(¶m, 4); assert(res == 0); + idx = 0; res = 0; - res += mx_mysql_bind_var(¶m, 0, string, &hostname); - res += mx_mysql_bind_var(¶m, 1, string, &server_id); - res += mx_mysql_bind_var(¶m, 2, uint64, &group_id); + res += mx_mysql_bind_var(¶m, idx++, uint32, &daemon->daemon_id); + res += mx_mysql_bind_var(¶m, idx++, string, &daemon->hostname); + res += mx_mysql_bind_var(¶m, idx++, string, &daemon->daemon_name); + res += mx_mysql_bind_var(¶m, idx++, uint64, &group_id); assert(res == 0); res = mx_mysql_do_statement_noresult(mysql, query, ¶m); - if (res < 0) { + if (res < 0) mx_log_err("mx_mysql_do_statement(): %m"); - return res; - } return res; } -int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, char *hostname, char *server_id) +int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, struct mxq_daemon *daemon) { - int res; struct mx_mysql_bind param = {0}; + int res; + int idx; assert(mysql); - assert(hostname); - assert(*hostname); - assert(server_id); - assert(*server_id); + + assert(daemon->hostname); + assert(daemon->daemon_name); + + assert(*daemon->hostname); + assert(*daemon->daemon_name); char *query = - "UPDATE mxq_job SET" - " job_status = " status_str(MXQ_JOB_STATUS_INQ) - " WHERE host_pid = 0" - " AND job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) - " AND host_hostname = ?" - " AND server_id = ?"; + "UPDATE" + " mxq_job" + " SET" + " daemon_id = 0," + " job_status = " status_str(MXQ_JOB_STATUS_INQ) + " WHERE job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) + " AND host_pid = 0" + " AND host_hostname = ?" + " AND server_id = ?"; res = mx_mysql_bind_init_param(¶m, 2); assert(res == 0); + idx = 0; res = 0; - res += mx_mysql_bind_var(¶m, 0, string, &hostname); - res += mx_mysql_bind_var(¶m, 1, string, &server_id); + res += mx_mysql_bind_var(¶m, idx++, string, &daemon->hostname); + res += mx_mysql_bind_var(¶m, idx++, string, &daemon->daemon_name); assert(res == 0); res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, ¶m); @@ -366,32 +385,39 @@ int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, char *hostname, char *se return res; } -int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job) +int mxq_set_job_status_loaded(struct mx_mysql *mysql, struct mxq_job *job) { - int res; struct mx_mysql_bind param = {0}; + char *host_id; + int res; + int idx; assert(mysql); assert(job); + assert(job->job_id); + assert(job->daemon_id); + + mx_asprintf_forever(&host_id, "%u", job->daemon_id); char *query = - "UPDATE mxq_job SET" - " job_status = " status_str(MXQ_JOB_STATUS_LOADED) - ", host_id = ?" - " WHERE job_id = ?" - " AND job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) - " AND host_hostname = ?" - " AND server_id = ?" - " AND host_pid = 0"; + "UPDATE" + " mxq_job" + " SET" + " host_id = ?," + " job_status = " status_str(MXQ_JOB_STATUS_LOADED) + " WHERE job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) + " AND job_id = ?" + " AND daemon_id = ?" + " AND host_pid = 0"; - res = mx_mysql_bind_init_param(¶m, 4); + res = mx_mysql_bind_init_param(¶m, 3); assert(res == 0); + idx = 0; res = 0; - res += mx_mysql_bind_var(¶m, 0, string, &(job->host_id)); - res += mx_mysql_bind_var(¶m, 1, uint64, &(job->job_id)); - res += mx_mysql_bind_var(¶m, 2, string, &(job->host_hostname)); - res += mx_mysql_bind_var(¶m, 3, string, &(job->server_id)); + res += mx_mysql_bind_var(¶m, idx++, string, &host_id); + res += mx_mysql_bind_var(¶m, idx++, uint64, &(job->job_id)); + res += mx_mysql_bind_var(¶m, idx++, uint32, &(job->daemon_id)); assert(res == 0); res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, ¶m); @@ -400,6 +426,9 @@ int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job * return res; } + mx_free_null(job->host_id); + + job->host_id = host_id; job->job_status = MXQ_JOB_STATUS_LOADED; return res; @@ -407,39 +436,46 @@ int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job * int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job) { - int res; - struct mx_mysql_bind param = {0}; - assert(mysql); + assert(job); + assert(job->job_id); + assert(job->daemon_id); + assert(job->host_pid); + assert(job->host_slots); + assert(job->host_cpu_set_str); - if (job->job_status != MXQ_JOB_STATUS_LOADED) { - mx_log_warning("new status==runnning but old status(=%d) is != loaded ", job->job_status); - } + assert(*job->host_cpu_set_str); - char *query = - "UPDATE mxq_job SET" - " job_status = " status_str(MXQ_JOB_STATUS_RUNNING) "," - " date_start = NULL," - " host_pid = ?," - " host_slots = ?," - " host_cpu_set = ?" - " WHERE job_id = ?" - " AND job_status = " status_str(MXQ_JOB_STATUS_LOADED) - " AND host_hostname = ?" - " AND server_id = ?" - " AND host_pid = 0"; + struct mx_mysql_bind param = {0}; + int res; + int idx; - res = mx_mysql_bind_init_param(¶m, 6); + char *query = + "UPDATE" + " mxq_job" + " SET" + " host_pid = ?," + " host_slots = ?," + " host_cpu_set = ?," + " date_start = NULL," + " job_status = " status_str(MXQ_JOB_STATUS_RUNNING) + " WHERE job_status = " status_str(MXQ_JOB_STATUS_LOADED) + " AND job_id = ?" + " AND daemon_id = ?" + " AND host_pid = 0"; + + res = mx_mysql_bind_init_param(¶m, 5); assert(res == 0); - res = 0; - res += mx_mysql_bind_var(¶m, 0, uint32, &(job->host_pid)); - res += mx_mysql_bind_var(¶m, 1, uint32, &(job->host_slots)); - res += mx_mysql_bind_var(¶m, 2, string, &(job->host_cpu_set_str)); - res += mx_mysql_bind_var(¶m, 3, uint64, &(job->job_id)); - res += mx_mysql_bind_var(¶m, 4, string, &(job->host_hostname)); - res += mx_mysql_bind_var(¶m, 5, string, &(job->server_id)); + idx = 0; + res = 0; + res += mx_mysql_bind_var(¶m, idx++, uint32, &(job->host_pid)); + res += mx_mysql_bind_var(¶m, idx++, uint32, &(job->host_slots)); + res += mx_mysql_bind_var(¶m, idx++, string, &(job->host_cpu_set_str)); + + res += mx_mysql_bind_var(¶m, idx++, uint64, &(job->job_id)); + res += mx_mysql_bind_var(¶m, idx++, uint32, &(job->daemon_id)); assert(res == 0); res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, ¶m); @@ -461,8 +497,9 @@ int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job) struct mx_mysql_bind param = {0}; assert(mysql); - assert(job); + assert(job->daemon_id); + assert(job->host_pid); if (WIFEXITED(job->stats_status)) { if (WEXITSTATUS(job->stats_status)) { @@ -478,42 +515,41 @@ int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job) return -1; } - if (job->job_status != MXQ_JOB_STATUS_RUNNING && job->job_status != MXQ_JOB_STATUS_KILLING) { - mx_log_warning("new status==exited but old status(=%d) is != running ", job->job_status); - } - char *query = - "UPDATE mxq_job SET" - " job_status = ?," - " date_end = NULL," - " stats_max_sumrss = ?, " - " stats_status = ?, " - " stats_utime_sec = ?, " - " stats_utime_usec = ?, " - " stats_stime_sec = ?, " - " stats_stime_usec = ?, " - " stats_real_sec = ?, " - " stats_real_usec = ?, " - " stats_maxrss = ?, " - " stats_minflt = ?, " - " stats_majflt = ?, " - " stats_nswap = ?, " - " stats_inblock = ?, " - " stats_oublock = ?, " - " stats_nvcsw = ?, " - " stats_nivcsw = ?" - " WHERE job_id = ?" - " AND job_status IN (" status_str(MXQ_JOB_STATUS_LOADED) ", " status_str(MXQ_JOB_STATUS_RUNNING) ", " status_str(MXQ_JOB_STATUS_KILLING) ")" - " AND host_hostname = ?" - " AND server_id = ?" - " AND host_pid = ?"; - - res = mx_mysql_bind_init_param(¶m, 21); + "UPDATE" + " mxq_job" + " SET" + " stats_max_sumrss = ?," + " stats_status = ?," + " stats_utime_sec = ?," + " stats_utime_usec = ?," + " stats_stime_sec = ?," + " stats_stime_usec = ?," + " stats_real_sec = ?," + " stats_real_usec = ?," + " stats_maxrss = ?," + " stats_minflt = ?," + " stats_majflt = ?," + " stats_nswap = ?," + " stats_inblock = ?," + " stats_oublock = ?," + " stats_nvcsw = ?," + " stats_nivcsw = ?," + " job_status = ?," + " date_end = NULL" + " WHERE job_status IN (" + status_str(MXQ_JOB_STATUS_LOADED) "," + status_str(MXQ_JOB_STATUS_RUNNING) "," + status_str(MXQ_JOB_STATUS_KILLING) ")" + " AND job_id = ?" + " AND daemon_id = ?" + " AND host_pid = ?"; + + res = mx_mysql_bind_init_param(¶m, 20); assert(res == 0); idx = 0; res = 0; - res += mx_mysql_bind_var(¶m, idx++, uint16, &(newstatus)); res += mx_mysql_bind_var(¶m, idx++, uint64, &(job->stats_max_sumrss)); res += mx_mysql_bind_var(¶m, idx++, int32, &(job->stats_status)); res += mx_mysql_bind_var(¶m, idx++, int64, &(job->stats_rusage.ru_utime.tv_sec)); @@ -530,9 +566,9 @@ int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job) res += mx_mysql_bind_var(¶m, idx++, int64, &(job->stats_rusage.ru_oublock)); res += mx_mysql_bind_var(¶m, idx++, int64, &(job->stats_rusage.ru_nvcsw)); res += mx_mysql_bind_var(¶m, idx++, int64, &(job->stats_rusage.ru_nivcsw)); + res += mx_mysql_bind_var(¶m, idx++, uint16, &(newstatus)); res += mx_mysql_bind_var(¶m, idx++, uint64, &(job->job_id)); - res += mx_mysql_bind_var(¶m, idx++, string, &(job->host_hostname)); - res += mx_mysql_bind_var(¶m, idx++, string, &(job->server_id)); + res += mx_mysql_bind_var(¶m, idx++, uint32, &(job->daemon_id)); res += mx_mysql_bind_var(¶m, idx++, uint32, &(job->host_pid)); assert(res == 0); @@ -586,7 +622,7 @@ int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j) mx_asprintf_forever(&j->tmp_stdout, "%s/mxq.%u.%lu.%lu.%s.%s.%d.stdout.tmp", dir, g->user_uid, g->group_id, j->job_id, j->host_hostname, - j->server_id, j->host_pid); + j->daemon_name, j->host_pid); } if (!mx_streq(j->job_stderr, "/dev/null")) { @@ -600,136 +636,131 @@ int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j) mx_asprintf_forever(&j->tmp_stderr, "%s/mxq.%u.%lu.%lu.%s.%s.%d.stderr.tmp", dir, g->user_uid, g->group_id, j->job_id, j->host_hostname, - j->server_id, j->host_pid); + j->daemon_name, j->host_pid); } return 1; } -int mxq_load_job_from_group_assigned_to_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t group_id, char *hostname, char *server_id) +int mxq_load_job_from_group_assigned_to_daemon(struct mx_mysql *mysql, struct mxq_job **jobs_result, uint64_t group_id, struct mxq_daemon *daemon) { - int res; - struct mxq_job *jobs = NULL; + struct mxq_job *jobs_tmp = NULL; struct mx_mysql_bind param = {0}; + int res; + int idx; assert(mysql); - assert(mxq_jobs); - assert(!(*mxq_jobs)); - assert(hostname); - assert(*hostname); - assert(server_id); - assert(*server_id); + assert(jobs_result); + assert(!(*jobs_result)); + assert(daemon); + assert(daemon->daemon_id); char *query = "SELECT" JOB_FIELDS - " FROM mxq_job" + " FROM" + " mxq_job" " WHERE job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) - " AND host_hostname = ?" - " AND server_id = ?" - " AND group_id = ?" + " AND group_id = ?" + " AND daemon_id = ?" " LIMIT 1"; - res = mx_mysql_bind_init_param(¶m, 3); + res = mx_mysql_bind_init_param(¶m, 2); assert(res == 0); + idx = 0; res = 0; - res += mx_mysql_bind_var(¶m, 0, string, &hostname); - res += mx_mysql_bind_var(¶m, 1, string, &server_id); - res += mx_mysql_bind_var(¶m, 2, uint64, &group_id); + res += mx_mysql_bind_var(¶m, idx++, uint64, &group_id); + res += mx_mysql_bind_var(¶m, idx++, uint32, &daemon->daemon_id); assert(res == 0); - res=do_jobs_statement(mysql, query, ¶m, &jobs); - if (res < 0) { - return res; - } + res = do_jobs_statement(mysql, query, ¶m, &jobs_tmp); + if (res >= 0) + *jobs_result = jobs_tmp; - *mxq_jobs = jobs; return res; } -int mxq_load_job_from_group_for_server(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, char *hostname, char *server_id, char *host_id) +int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *job, uint64_t group_id, struct mxq_daemon *daemon) { int res; - struct mxq_job *jobs = NULL; + struct mxq_job *jobs_tmp = NULL; assert(mysql); - assert(mxqjob); - assert(hostname); - assert(*hostname); - assert(server_id); - assert(*server_id); - assert(host_id); - assert(*host_id); + assert(job); + assert(daemon); do { - res = mxq_load_job_from_group_assigned_to_server(mysql, &jobs, group_id, hostname, server_id); + res = mxq_load_job_from_group_assigned_to_daemon(mysql, &jobs_tmp, group_id, daemon); if(res < 0) { - mx_log_err(" group_id=%lu :: mxq_load_job_from_group_assigned_to_server: %m", group_id); + mx_log_err(" group_id=%lu :: mxq_load_job_from_group_assigned_to_daemon: %m", group_id); return 0; } if(res == 1) { - memcpy(mxqjob, &jobs[0], sizeof(*mxqjob)); - free(jobs); + memcpy(job, &jobs_tmp[0], sizeof(*job)); + mx_free_null(jobs_tmp); break; } - res = mxq_assign_job_from_group_to_server(mysql, group_id, hostname, server_id); + res = mxq_assign_job_from_group_to_daemon(mysql, group_id, daemon); if (res < 0) { - mx_log_err(" group_id=%lu :: mxq_assign_job_from_group_to_server(): %m", group_id); + mx_log_err(" group_id=%lu :: mxq_assign_job_from_group_to_daemon(): %m", group_id); return 0; } if (res == 0) { - mx_log_warning(" group_id=%lu :: mxq_assign_job_from_group_to_server(): No matching job found - maybe another server was a bit faster. ;)", group_id); + mx_log_warning(" group_id=%lu :: mxq_assign_job_from_group_to_daemon(): No matching job found - maybe another server was a bit faster. ;)", group_id); return 0; } } while (1); - mx_free_null(mxqjob->host_id); - mxqjob->host_id = mx_strdup_forever(host_id); - - res = mxq_set_job_status_loaded_on_server(mysql, mxqjob); + res = mxq_set_job_status_loaded(mysql, job); if (res < 0) { - mx_log_err(" group_id=%lu job_id=%lu :: mxq_set_job_status_loaded_on_server(): %m", group_id, mxqjob->job_id); + mx_log_err(" group_id=%lu job_id=%lu :: mxq_set_job_status_loaded_on_server(): %m", group_id, job->job_id); return 0; } if (res == 0) { - mx_log_err(" group_id=%lu job_id=%lu :: mxq_set_job_status_loaded_on_server(): Job not found", group_id, mxqjob->job_id); + mx_log_err(" group_id=%lu job_id=%lu :: mxq_set_job_status_loaded_on_server(): Job not found", group_id, job->job_id); return 0; } - mxqjob->job_status = MXQ_JOB_STATUS_LOADED; + job->job_status = MXQ_JOB_STATUS_LOADED; return 1; } -int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, char *hostname, char *server_id) +int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **jobs_result, struct mxq_daemon *daemon) { + struct mxq_job *jobs_tmp = NULL; + struct mx_mysql_bind param = {0}; + int idx; int res; - struct mxq_job *jobs = NULL; - struct mx_mysql_bind param = {0}; + assert(daemon); + assert(daemon->hostname); + assert(daemon->daemon_name); + assert(*daemon->hostname); + assert(*daemon->daemon_name); char *query = "SELECT" JOB_FIELDS - " FROM mxq_job" - " WHERE job_status = " status_str(MXQ_JOB_STATUS_RUNNING) - " AND host_hostname=?" - " AND server_id=?"; + " FROM" + " mxq_job" + " WHERE job_status = " status_str(MXQ_JOB_STATUS_RUNNING) + " AND host_hostname = ?" + " AND server_id = ?"; res = mx_mysql_bind_init_param(¶m, 2); assert(res == 0); - res=0; - res += mx_mysql_bind_var(¶m, 0, string, &hostname); - res += mx_mysql_bind_var(¶m, 1, string, &server_id); + idx = 0; + res = 0; + res += mx_mysql_bind_var(¶m, idx++, string, &daemon->hostname); + res += mx_mysql_bind_var(¶m, idx++, string, &daemon->daemon_name); assert(res == 0); - res=do_jobs_statement(mysql, query, ¶m, &jobs); - if (res < 0) { - return res; - } + res=do_jobs_statement(mysql, query, ¶m, &jobs_tmp); + if (res >= 0) + *jobs_result = jobs_tmp; - *mxq_jobs = jobs; return res; } diff --git a/mxq_job.h b/mxq_job.h index fd7e1f14..e50c3b86 100644 --- a/mxq_job.h +++ b/mxq_job.h @@ -9,6 +9,7 @@ #include +#include "mxq_daemon.h" #include "mxq_group.h" struct mxq_job { @@ -38,7 +39,8 @@ struct mxq_job { char * host_submit; char * host_id; - char * server_id; + uint32_t daemon_id; + char * daemon_name; char * host_hostname; @@ -99,15 +101,14 @@ void mxq_job_free_content(struct mxq_job *j); int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job_id); int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp); int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp, uint64_t job_status); -int mxq_assign_job_from_group_to_server(struct mx_mysql *mysql, uint64_t group_id, char *hostname, char *server_id); -int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, char *hostname, char *server_id); +int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon); +int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, struct mxq_daemon *daemon); int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job); int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job); int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job); int mxq_set_job_status_unknown(struct mx_mysql *mysql, struct mxq_job *job); int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j); -int mxq_load_job_from_group_assigned_to_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t group_id, char *hostname, char *server_id); -int mxq_load_job_from_group_for_server(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, char *hostname, char *server_id, char *host_id); -int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, char *hostname, char *server_id); - +int mxq_load_job_from_group_assigned_to_daemon(struct mx_mysql *mysql, struct mxq_job **jobs_result, uint64_t group_id, struct mxq_daemon *daemon); +int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, struct mxq_daemon *daemon); +int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **jobs_result, struct mxq_daemon *daemon); #endif diff --git a/mxqd.c b/mxqd.c index babe9f30..58123a44 100644 --- a/mxqd.c +++ b/mxqd.c @@ -514,23 +514,23 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) } server->hostname = arg_hostname; - server->server_id = arg_daemon_name; + server->daemon_name = arg_daemon_name; server->initial_path = arg_initial_path; server->initial_tmpdir = arg_initial_tmpdir; server->recoveronly = arg_recoveronly; - server->flock = mx_flock(LOCK_EX, "/dev/shm/mxqd.%s.%s.lck", server->hostname, server->server_id); + server->flock = mx_flock(LOCK_EX, "/dev/shm/mxqd.%s.%s.lck", server->hostname, server->daemon_name); if (!server->flock) { - mx_log_err("mx_flock(/dev/shm/mxqd.%s.%s.lck) failed: %m", server->hostname, server->server_id); + mx_log_err("mx_flock(/dev/shm/mxqd.%s.%s.lck) failed: %m", server->hostname, server->daemon_name); return -EX_UNAVAILABLE; } if (!server->flock->locked) { - mx_log_err("MXQ Server '%s' on host '%s' is already running. Exiting.", server->server_id, server->hostname); + mx_log_err("MXQ Server '%s' on host '%s' is already running. Exiting.", server->daemon_name, server->hostname); return -EX_UNAVAILABLE; } - mx_asprintf_forever(&server->finished_jobsdir,"%s/%s",MXQ_FINISHED_JOBSDIR,server->server_id); + mx_asprintf_forever(&server->finished_jobsdir,"%s/%s",MXQ_FINISHED_JOBSDIR,server->daemon_name); res=mx_mkdir_p(server->finished_jobsdir,0700); if (res<0) { mx_log_err("MAIN: mkdir %s failed: %m. Exiting.",MXQ_FINISHED_JOBSDIR); @@ -754,7 +754,7 @@ static int init_child_process(struct mxq_group_list *glist, struct mxq_job *job) mx_setenvf_forever("MXQ_TIME", "%d", group->job_time); mx_setenv_forever("MXQ_HOSTID", server->host_id); mx_setenv_forever("MXQ_HOSTNAME", server->hostname); - mx_setenv_forever("MXQ_SERVERID", server->server_id); + mx_setenv_forever("MXQ_SERVERID", server->daemon_name); fh = open("/proc/self/loginuid", O_WRONLY|O_TRUNC); if (fh == -1) { @@ -1116,6 +1116,8 @@ unsigned long start_job(struct mxq_group_list *glist) struct mxq_group *group; + struct mxq_daemon *daemon; + pid_t pid; int res; @@ -1124,10 +1126,11 @@ unsigned long start_job(struct mxq_group_list *glist) assert(glist->user->server); server = glist->user->server; + daemon = &server->daemon; group = &glist->group; job = &_mxqjob; - res = mxq_load_job_from_group_for_server(server->mysql, job, group->group_id, server->hostname, server->server_id, server->host_id); + res = mxq_load_job_from_group_for_daemon(server->mysql, job, group->group_id, daemon); if (!res) { return 0; } @@ -2017,7 +2020,10 @@ static int lost_scan(struct mxq_server *server) static int load_running_jobs(struct mxq_server *server) { + assert(server); + _mx_cleanup_free_ struct mxq_job *jobs = NULL; + struct mxq_daemon *daemon = &server->daemon; struct mxq_job_list *jlist; struct mxq_group_list *glist; @@ -2028,7 +2034,7 @@ static int load_running_jobs(struct mxq_server *server) int j; - job_cnt = mxq_load_jobs_running_on_server(server->mysql, &jobs, server->hostname, server->server_id); + job_cnt = mxq_load_jobs_running_on_server(server->mysql, &jobs, daemon); if (job_cnt < 0) return job_cnt; @@ -2201,21 +2207,22 @@ int load_running_groups(struct mxq_server *server) int recover_from_previous_crash(struct mxq_server *server) { - int res; - assert(server); assert(server->mysql); assert(server->hostname); - assert(server->server_id); + assert(server->daemon_name); + + int res; + struct mxq_daemon *daemon = &server->daemon; - res = mxq_unassign_jobs_of_server(server->mysql, server->hostname, server->server_id); + res = mxq_unassign_jobs_of_server(server->mysql, daemon); if (res < 0) { mx_log_info("mxq_unassign_jobs_of_server() failed: %m"); return res; } if (res > 0) - mx_log_info("hostname=%s server_id=%s :: recovered from previous crash: unassigned %d jobs.", - server->hostname, server->server_id, res); + mx_log_info("hostname=%s daemon_name=%s :: recovered from previous crash: unassigned %d jobs.", + server->hostname, server->daemon_name, res); res = load_running_groups(server); mx_log_info("recover: %d running groups loaded.", res); diff --git a/mxqd.h b/mxqd.h index 15b35125..fb66eddc 100644 --- a/mxqd.h +++ b/mxqd.h @@ -94,7 +94,7 @@ struct mxq_server { unsigned long long int starttime; char *host_id; char *hostname; - char *server_id; + char *daemon_name; char *pidfilename; char *finished_jobsdir; struct mx_flock *flock; diff --git a/mxqdump.c b/mxqdump.c index 2d6ed40f..28e9f7bf 100644 --- a/mxqdump.c +++ b/mxqdump.c @@ -238,7 +238,7 @@ static int print_job(struct mxq_group *g, struct mxq_job *j) j->job_id, j->host_pid, j->host_hostname, - j->server_id, + j->daemon_name, g->group_name, wait_sec, run_sec,