From f1ca63e6239980db4916f5f38afab684734422f0 Mon Sep 17 00:00:00 2001 From: Marius Tolzmann Date: Wed, 29 Jul 2015 17:10:48 +0200 Subject: [PATCH] mxqd: Recover from previous server crash fixes https://github.molgen.mpg.de/mariux64/mxq/issues/8 - unassign already assigned jobs - change status to unknown for loaded and/or running jobs --- mxq_job.c | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ mxq_job.h | 2 ++ mxqd.c | 45 +++++++++++++++++++++++++++++++--- 3 files changed, 116 insertions(+), 3 deletions(-) diff --git a/mxq_job.c b/mxq_job.c index d85a540..21ff955 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -338,6 +338,42 @@ int mxq_assign_job_from_group_to_server(struct mx_mysql *mysql, uint64_t group_i return res; } +int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, char *hostname, char *server_id) +{ + int res; + struct mx_mysql_bind param = {0}; + + assert(mysql); + assert(hostname); + assert(*hostname); + assert(server_id); + assert(*server_id); + + char *query = + "UPDATE mxq_job SET" + " job_status = " status_str(MXQ_JOB_STATUS_INQ) + " WHERE host_pid = 0" + " AND job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) + " AND host_hostname = ?" + " AND server_id = ?"; + + res = mx_mysql_bind_init_param(¶m, 2); + assert(res == 0); + + res = 0; + res += mx_mysql_bind_var(¶m, 0, string, &hostname); + res += mx_mysql_bind_var(¶m, 1, string, &server_id); + assert(res == 0); + + res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, ¶m); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + + return res; +} + int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job) { int res; @@ -510,6 +546,42 @@ int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job) return res; } +int mxq_set_job_status_unknown_for_server(struct mx_mysql *mysql, char *hostname, char *server_id) +{ + int res; + struct mx_mysql_bind param = {0}; + + assert(mysql); + assert(hostname); + assert(*hostname); + assert(server_id); + assert(*server_id); + + char *query = + "UPDATE mxq_job SET" + " job_status = " status_str(MXQ_JOB_STATUS_UNKNOWN) "," + " date_end = NULL" + " WHERE job_status IN (" status_str(MXQ_JOB_STATUS_LOADED) "," status_str(MXQ_JOB_STATUS_RUNNING) ")" + " AND host_hostname = ?" + " AND server_id = ?"; + + res = mx_mysql_bind_init_param(¶m, 2); + assert(res == 0); + + res = 0; + res += mx_mysql_bind_var(¶m, 0, string, &hostname); + res += mx_mysql_bind_var(¶m, 1, string, &server_id); + assert(res == 0); + + res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, ¶m); + if (res < 0) { + mx_log_err("mx_mysql_do_statement(): %m"); + return res; + } + + return res; +} + int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j) { if (!mx_streq(j->job_stdout, "/dev/null")) { diff --git a/mxq_job.h b/mxq_job.h index 3480c95..32e9f32 100644 --- a/mxq_job.h +++ b/mxq_job.h @@ -93,9 +93,11 @@ int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp); int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp, uint64_t job_status); int mxq_assign_job_from_group_to_server(struct mx_mysql *mysql, uint64_t group_id, char *hostname, char *server_id); +int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, char *hostname, char *server_id); int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job); int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job); int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job); +int mxq_set_job_status_unknown_for_server(struct mx_mysql *mysql, char *hostname, char *server_id); int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j); int mxq_load_job_assigned_to_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, char *hostname, char *server_id); int mxq_load_job_from_group_for_server(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, char *hostname, char *server_id); diff --git a/mxqd.c b/mxqd.c index 034e2c1..ecd1a45 100644 --- a/mxqd.c +++ b/mxqd.c @@ -1552,6 +1552,37 @@ int load_groups(struct mxq_server *server) { return total; } +int recover_from_previous_crash(struct mxq_server *server) +{ + int res1; + int res2; + + assert(server); + assert(server->mysql); + assert(server->hostname); + assert(server->server_id); + + res1 = mxq_unassign_jobs_of_server(server->mysql, server->hostname, server->server_id); + if (res1 < 0) { + mx_log_info("mxq_unassign_jobs_of_server() failed: %m"); + return res1; + } + if (res1 > 0) + mx_log_info("hostname=%s server_id=%s :: recovered from previous crash: unassigned %d jobs.", + server->hostname, server->server_id, res1); + + res2 = mxq_set_job_status_unknown_for_server(server->mysql, server->hostname, server->server_id); + if (res2 < 0) { + mx_log_info("mxq_unassign_jobs_of_server() failed: %m"); + return res2; + } + if (res2 > 0) + mx_log_info("hostname=%s server_id=%s :: recovered from previous crash: set job_status='unknown' for %d jobs.", + server->hostname, server->server_id, res2); + + return res1+res2; +} + /**********************************************************************/ static void no_handler(int sig) {} @@ -1578,6 +1609,7 @@ int main(int argc, char *argv[]) unsigned long slots_returned = 0; int res; + int fail = 0; /*** server init ***/ @@ -1611,7 +1643,15 @@ int main(int argc, char *argv[]) signal(SIGTTOU, SIG_IGN); signal(SIGCHLD, no_handler); - do { + res = recover_from_previous_crash(&server); + if (res < 0) { + mx_log_warning("recover_from_previous_crash() failed. Aborting execution."); + fail = 1; + } + if (res > 0) + mx_log_warning("total %d jobs recovered from previous crash."); + + while (!global_sigint_cnt && !global_sigterm_cnt && !fail) { slots_returned = catchall(&server); if (slots_returned) mx_log_info("slots_returned=%lu :: Main Loop freed %lu slots.", slots_returned, slots_returned); @@ -1658,8 +1698,7 @@ int main(int argc, char *argv[]) } continue; } - } while (!global_sigint_cnt && !global_sigterm_cnt); - + } /*** clean up ***/ mx_log_info("global_sigint_cnt=%d global_sigterm_cnt=%d : Exiting.", global_sigint_cnt, global_sigterm_cnt);