diff --git a/mx_mysql.c b/mx_mysql.c index 79922d0d..7e3fc231 100644 --- a/mx_mysql.c +++ b/mx_mysql.c @@ -422,6 +422,7 @@ static int mx__mysql_stmt_execute(struct mx_mysql_stmt *stmt) case CR_SERVER_GONE_ERROR: case CR_SERVER_LOST: + case ER_LOCK_DEADLOCK: return -(errno=EAGAIN); case CR_UNKNOWN_ERROR: diff --git a/mxq_job.c b/mxq_job.c index eea849e9..28fa0299 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -291,7 +291,7 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job ** return res; } -int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job) +int mxq_assign_jobs_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job, uint64_t limit) { struct mx_mysql_bind param = {0}; int res; @@ -304,6 +304,8 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i assert(daemon->daemon_name); assert(*daemon->daemon_name); assert(daemon->daemon_id); + assert(slots_per_job); + assert(limit); char *query = "UPDATE " @@ -323,9 +325,9 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i " ORDER BY" " job_priority," " job_id" - " LIMIT 1"; + " LIMIT ?"; - res = mx_mysql_bind_init_param(¶m, 5); + res = mx_mysql_bind_init_param(¶m, 6); assert(res == 0); idx = 0; @@ -335,6 +337,7 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i res += mx_mysql_bind_var(¶m, idx++, uint64, &slots_per_job); res += mx_mysql_bind_var(¶m, idx++, string, &daemon->daemon_name); res += mx_mysql_bind_var(¶m, idx++, uint64, &group_id); + res += mx_mysql_bind_var(¶m, idx++, uint64, &limit); assert(res == 0); res = mx_mysql_do_statement_noresult(mysql, query, ¶m); @@ -681,7 +684,7 @@ int mxq_load_job_from_group_assigned_to_daemon(struct mx_mysql *mysql, struct mx } int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *job, uint64_t group_id, struct mxq_daemon *daemon, - unsigned long slots_per_job) + unsigned long slots_per_job, uint64_t jobs_max) { int res; struct mxq_job *jobs_tmp = NULL; @@ -689,6 +692,7 @@ int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *j assert(mysql); assert(job); assert(daemon); + assert(jobs_max); do { res = mxq_load_job_from_group_assigned_to_daemon(mysql, &jobs_tmp, group_id, daemon); @@ -703,13 +707,13 @@ int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *j break; } - res = mxq_assign_job_from_group_to_daemon(mysql, group_id, daemon, slots_per_job); + res = mxq_assign_jobs_from_group_to_daemon(mysql, group_id, daemon, slots_per_job, jobs_max); if (res < 0) { - mx_log_err(" group_id=%lu :: mxq_assign_job_from_group_to_daemon(): %m", group_id); + mx_log_err(" group_id=%lu :: mxq_assign_jobs_from_group_to_daemon(): %m", group_id); return 0; } if (res == 0) { - mx_log_warning(" group_id=%lu :: mxq_assign_job_from_group_to_daemon(): No matching job found - maybe another server was a bit faster. ;)", group_id); + mx_log_warning(" group_id=%lu :: mxq_assign_jobs_from_group_to_daemon(): No matching job found - maybe another server was a bit faster. ;)", group_id); return 0; } } while (1); diff --git a/mxq_job.h b/mxq_job.h index cca5b561..6c05577f 100644 --- a/mxq_job.h +++ b/mxq_job.h @@ -101,7 +101,7 @@ void mxq_job_free_content(struct mxq_job *j); int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job_id); int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp); int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp, uint64_t job_status); -int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job); +int mxq_assign_jobs_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job, uint64_t limit); int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, struct mxq_daemon *daemon); int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job); int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job); @@ -109,6 +109,6 @@ int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job); int mxq_set_job_status_unknown(struct mx_mysql *mysql, struct mxq_job *job); int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j); int mxq_load_job_from_group_assigned_to_daemon(struct mx_mysql *mysql, struct mxq_job **jobs_result, uint64_t group_id, struct mxq_daemon *daemon); -int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, struct mxq_daemon *daemon,unsigned long slots_per_job); +int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, struct mxq_daemon *daemon,unsigned long slots_per_job, uint64_t jobs_max); int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **jobs_result, struct mxq_daemon *daemon); #endif diff --git a/mxqd.c b/mxqd.c index 315df8db..3c5e5e17 100644 --- a/mxqd.c +++ b/mxqd.c @@ -1174,7 +1174,7 @@ unsigned long start_job(struct mxq_group_list *glist) group = &glist->group; job = &_mxqjob; - res = mxq_load_job_from_group_for_daemon(server->mysql, job, group->group_id, daemon, glist->slots_per_job); + res = mxq_load_job_from_group_for_daemon(server->mysql, job, group->group_id, daemon, glist->slots_per_job, glist->jobs_max); if (!res) { return 0; }