Skip to content

Commit

Permalink
mxqd: Recover from previous server crash
Browse files Browse the repository at this point in the history
fixes mariux64#8

 - unassign already assigned jobs
 - change status to unknown for loaded and/or running jobs
mariux committed Jul 29, 2015
1 parent 3437dff commit f1ca63e
Showing 3 changed files with 116 additions and 3 deletions.
72 changes: 72 additions & 0 deletions mxq_job.c
Original file line number Diff line number Diff line change
@@ -338,6 +338,42 @@ int mxq_assign_job_from_group_to_server(struct mx_mysql *mysql, uint64_t group_i
return res;
}

int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, char *hostname, char *server_id)
{
int res;
struct mx_mysql_bind param = {0};

assert(mysql);
assert(hostname);
assert(*hostname);
assert(server_id);
assert(*server_id);

char *query =
"UPDATE mxq_job SET"
" job_status = " status_str(MXQ_JOB_STATUS_INQ)
" WHERE host_pid = 0"
" AND job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED)
" AND host_hostname = ?"
" AND server_id = ?";

res = mx_mysql_bind_init_param(&param, 2);
assert(res == 0);

res = 0;
res += mx_mysql_bind_var(&param, 0, string, &hostname);
res += mx_mysql_bind_var(&param, 1, string, &server_id);
assert(res == 0);

res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, &param);
if (res < 0) {
mx_log_err("mx_mysql_do_statement(): %m");
return res;
}

return res;
}

int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job)
{
int res;
@@ -510,6 +546,42 @@ int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job)
return res;
}

int mxq_set_job_status_unknown_for_server(struct mx_mysql *mysql, char *hostname, char *server_id)
{
int res;
struct mx_mysql_bind param = {0};

assert(mysql);
assert(hostname);
assert(*hostname);
assert(server_id);
assert(*server_id);

char *query =
"UPDATE mxq_job SET"
" job_status = " status_str(MXQ_JOB_STATUS_UNKNOWN) ","
" date_end = NULL"
" WHERE job_status IN (" status_str(MXQ_JOB_STATUS_LOADED) "," status_str(MXQ_JOB_STATUS_RUNNING) ")"
" AND host_hostname = ?"
" AND server_id = ?";

res = mx_mysql_bind_init_param(&param, 2);
assert(res == 0);

res = 0;
res += mx_mysql_bind_var(&param, 0, string, &hostname);
res += mx_mysql_bind_var(&param, 1, string, &server_id);
assert(res == 0);

res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, &param);
if (res < 0) {
mx_log_err("mx_mysql_do_statement(): %m");
return res;
}

return res;
}

int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j)
{
if (!mx_streq(j->job_stdout, "/dev/null")) {
2 changes: 2 additions & 0 deletions mxq_job.h
Original file line number Diff line number Diff line change
@@ -93,9 +93,11 @@ int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job
int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp);
int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp, uint64_t job_status);
int mxq_assign_job_from_group_to_server(struct mx_mysql *mysql, uint64_t group_id, char *hostname, char *server_id);
int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, char *hostname, char *server_id);
int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_unknown_for_server(struct mx_mysql *mysql, char *hostname, char *server_id);
int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j);
int mxq_load_job_assigned_to_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, char *hostname, char *server_id);
int mxq_load_job_from_group_for_server(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, char *hostname, char *server_id);
45 changes: 42 additions & 3 deletions mxqd.c
Original file line number Diff line number Diff line change
@@ -1552,6 +1552,37 @@ int load_groups(struct mxq_server *server) {
return total;
}

int recover_from_previous_crash(struct mxq_server *server)
{
int res1;
int res2;

assert(server);
assert(server->mysql);
assert(server->hostname);
assert(server->server_id);

res1 = mxq_unassign_jobs_of_server(server->mysql, server->hostname, server->server_id);
if (res1 < 0) {
mx_log_info("mxq_unassign_jobs_of_server() failed: %m");
return res1;
}
if (res1 > 0)
mx_log_info("hostname=%s server_id=%s :: recovered from previous crash: unassigned %d jobs.",
server->hostname, server->server_id, res1);

res2 = mxq_set_job_status_unknown_for_server(server->mysql, server->hostname, server->server_id);
if (res2 < 0) {
mx_log_info("mxq_unassign_jobs_of_server() failed: %m");
return res2;
}
if (res2 > 0)
mx_log_info("hostname=%s server_id=%s :: recovered from previous crash: set job_status='unknown' for %d jobs.",
server->hostname, server->server_id, res2);

return res1+res2;
}

/**********************************************************************/
static void no_handler(int sig) {}

@@ -1578,6 +1609,7 @@ int main(int argc, char *argv[])
unsigned long slots_returned = 0;

int res;
int fail = 0;

/*** server init ***/

@@ -1611,7 +1643,15 @@ int main(int argc, char *argv[])
signal(SIGTTOU, SIG_IGN);
signal(SIGCHLD, no_handler);

do {
res = recover_from_previous_crash(&server);
if (res < 0) {
mx_log_warning("recover_from_previous_crash() failed. Aborting execution.");
fail = 1;
}
if (res > 0)
mx_log_warning("total %d jobs recovered from previous crash.");

while (!global_sigint_cnt && !global_sigterm_cnt && !fail) {
slots_returned = catchall(&server);
if (slots_returned)
mx_log_info("slots_returned=%lu :: Main Loop freed %lu slots.", slots_returned, slots_returned);
@@ -1658,8 +1698,7 @@ int main(int argc, char *argv[])
}
continue;
}
} while (!global_sigint_cnt && !global_sigterm_cnt);

}
/*** clean up ***/

mx_log_info("global_sigint_cnt=%d global_sigterm_cnt=%d : Exiting.", global_sigint_cnt, global_sigterm_cnt);

0 comments on commit f1ca63e

Please sign in to comment.