From 9007b4a78a9287a46d31ec53da440dfe5553c680 Mon Sep 17 00:00:00 2001 From: Marius Tolzmann Date: Sun, 7 May 2017 18:30:19 +0200 Subject: [PATCH 1/4] mxq_job: allow to set limit for assigning jobs to deamon no logic was changed in this commit renamed mxq_assign_job_from_group_to_daemon() to mxq_assign_jobs_from_group_to_daemon() and added parameter limit relates to issue #51 --- mxq_job.c | 13 +++++++------ mxq_job.h | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/mxq_job.c b/mxq_job.c index eea849e9..88a1853b 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -291,7 +291,7 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job ** return res; } -int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job) +int mxq_assign_jobs_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job, uint64_t limit) { struct mx_mysql_bind param = {0}; int res; @@ -323,9 +323,9 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i " ORDER BY" " job_priority," " job_id" - " LIMIT 1"; + " LIMIT ?"; - res = mx_mysql_bind_init_param(¶m, 5); + res = mx_mysql_bind_init_param(¶m, 6); assert(res == 0); idx = 0; @@ -335,6 +335,7 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i res += mx_mysql_bind_var(¶m, idx++, uint64, &slots_per_job); res += mx_mysql_bind_var(¶m, idx++, string, &daemon->daemon_name); res += mx_mysql_bind_var(¶m, idx++, uint64, &group_id); + res += mx_mysql_bind_var(¶m, idx++, uint64, &limit); assert(res == 0); res = mx_mysql_do_statement_noresult(mysql, query, ¶m); @@ -703,13 +704,13 @@ int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *j break; } - res = mxq_assign_job_from_group_to_daemon(mysql, group_id, daemon, slots_per_job); + res = mxq_assign_jobs_from_group_to_daemon(mysql, group_id, daemon, slots_per_job, 1); if (res < 0) { - mx_log_err(" group_id=%lu :: mxq_assign_job_from_group_to_daemon(): %m", group_id); + mx_log_err(" group_id=%lu :: mxq_assign_jobs_from_group_to_daemon(): %m", group_id); return 0; } if (res == 0) { - mx_log_warning(" group_id=%lu :: mxq_assign_job_from_group_to_daemon(): No matching job found - maybe another server was a bit faster. ;)", group_id); + mx_log_warning(" group_id=%lu :: mxq_assign_jobs_from_group_to_daemon(): No matching job found - maybe another server was a bit faster. ;)", group_id); return 0; } } while (1); diff --git a/mxq_job.h b/mxq_job.h index cca5b561..7392588d 100644 --- a/mxq_job.h +++ b/mxq_job.h @@ -101,7 +101,7 @@ void mxq_job_free_content(struct mxq_job *j); int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job_id); int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp); int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp, uint64_t job_status); -int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job); +int mxq_assign_jobs_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job, uint64_t limit); int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, struct mxq_daemon *daemon); int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job); int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job); From b329b7a64ce0875e10669cf3eb0720c79cc95421 Mon Sep 17 00:00:00 2001 From: Marius Tolzmann Date: Sun, 7 May 2017 18:55:24 +0200 Subject: [PATCH 2/4] mxqd: always assign jobs_max jobs form group to daemon if no jobs are assigned relates to issue #51 --- mxq_job.c | 5 +++-- mxq_job.h | 2 +- mxqd.c | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/mxq_job.c b/mxq_job.c index 88a1853b..25d6338f 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -682,7 +682,7 @@ int mxq_load_job_from_group_assigned_to_daemon(struct mx_mysql *mysql, struct mx } int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *job, uint64_t group_id, struct mxq_daemon *daemon, - unsigned long slots_per_job) + unsigned long slots_per_job, uint64_t jobs_max) { int res; struct mxq_job *jobs_tmp = NULL; @@ -690,6 +690,7 @@ int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *j assert(mysql); assert(job); assert(daemon); + assert(jobs_max); do { res = mxq_load_job_from_group_assigned_to_daemon(mysql, &jobs_tmp, group_id, daemon); @@ -704,7 +705,7 @@ int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *j break; } - res = mxq_assign_jobs_from_group_to_daemon(mysql, group_id, daemon, slots_per_job, 1); + res = mxq_assign_jobs_from_group_to_daemon(mysql, group_id, daemon, slots_per_job, jobs_max); if (res < 0) { mx_log_err(" group_id=%lu :: mxq_assign_jobs_from_group_to_daemon(): %m", group_id); return 0; diff --git a/mxq_job.h b/mxq_job.h index 7392588d..6c05577f 100644 --- a/mxq_job.h +++ b/mxq_job.h @@ -109,6 +109,6 @@ int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job); int mxq_set_job_status_unknown(struct mx_mysql *mysql, struct mxq_job *job); int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j); int mxq_load_job_from_group_assigned_to_daemon(struct mx_mysql *mysql, struct mxq_job **jobs_result, uint64_t group_id, struct mxq_daemon *daemon); -int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, struct mxq_daemon *daemon,unsigned long slots_per_job); +int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, struct mxq_daemon *daemon,unsigned long slots_per_job, uint64_t jobs_max); int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **jobs_result, struct mxq_daemon *daemon); #endif diff --git a/mxqd.c b/mxqd.c index 315df8db..3c5e5e17 100644 --- a/mxqd.c +++ b/mxqd.c @@ -1174,7 +1174,7 @@ unsigned long start_job(struct mxq_group_list *glist) group = &glist->group; job = &_mxqjob; - res = mxq_load_job_from_group_for_daemon(server->mysql, job, group->group_id, daemon, glist->slots_per_job); + res = mxq_load_job_from_group_for_daemon(server->mysql, job, group->group_id, daemon, glist->slots_per_job, glist->jobs_max); if (!res) { return 0; } From db6c3e45b4a32a296405366a469161d9803222af Mon Sep 17 00:00:00 2001 From: Marius Tolzmann Date: Sun, 7 May 2017 19:18:41 +0200 Subject: [PATCH 3/4] mxq_job: Add missing assertions --- mxq_job.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mxq_job.c b/mxq_job.c index 25d6338f..28fa0299 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -304,6 +304,8 @@ int mxq_assign_jobs_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_ assert(daemon->daemon_name); assert(*daemon->daemon_name); assert(daemon->daemon_id); + assert(slots_per_job); + assert(limit); char *query = "UPDATE " From 72f254356acc14602cbbc53bfb686cfc493df6a1 Mon Sep 17 00:00:00 2001 From: Marius Tolzmann Date: Sun, 7 May 2017 19:32:37 +0200 Subject: [PATCH 4/4] mxqsub: Retry on deadlock in mysql database relates to issue #51 --- mx_mysql.c | 1 + 1 file changed, 1 insertion(+) diff --git a/mx_mysql.c b/mx_mysql.c index 79922d0d..7e3fc231 100644 --- a/mx_mysql.c +++ b/mx_mysql.c @@ -422,6 +422,7 @@ static int mx__mysql_stmt_execute(struct mx_mysql_stmt *stmt) case CR_SERVER_GONE_ERROR: case CR_SERVER_LOST: + case ER_LOCK_DEADLOCK: return -(errno=EAGAIN); case CR_UNKNOWN_ERROR: