From e6bfc5df9a519f3d8bc3827367e1be38f9ddee43 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Thu, 4 May 2017 16:37:29 +0200 Subject: [PATCH] mxq_job: Select job to run without lock When the server chose a job to assign to itself by UPDATE mxq_job SET ... WHERE (unassigned) ORDER BY LIMIT 1 we had a performance problem when a group contained very many pending jobs, because the UPDATE temporarily locked all records considered (issue #51). Now we SELECT without any lock and then UPDATE the single job by its primary index. In case there is a collision with another server, the second UPDATE will have no effect, because it still contains the original contraints. The second server will returns to its main loop where it will sleep for some seconds. --- mxq_job.c | 63 ++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 56 insertions(+), 7 deletions(-) diff --git a/mxq_job.c b/mxq_job.c index eea849e9..bb758a03 100644 --- a/mxq_job.c +++ b/mxq_job.c @@ -291,11 +291,59 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job ** return res; } +uint64_t mxq_select_job_from_group(struct mx_mysql *mysql, uint64_t group_id) +{ + struct mx_mysql_bind param = {0}; + struct mx_mysql_bind result = {0}; + uint64_t job_id; + uint64_t *job_id_out; + int res; + + char *query = + "SELECT" + " job_id" + " FROM" + " mxq_job" + " WHERE group_id = ?" + " AND job_status = " status_str(MXQ_JOB_STATUS_INQ) + " AND daemon_id = 0" + " AND host_hostname = ''" + " AND server_id = ''" + " AND host_pid = 0" + " ORDER BY" + " job_priority," + " job_id" + " LIMIT 1"; + + + res = mx_mysql_bind_init_param(¶m, 1); + assert(res==0); + + res += mx_mysql_bind_var(¶m,0, uint64, &group_id); + assert(res==0); + + res = mx_mysql_bind_init_result(&result, 1); + assert(res==0); + + res += mx_mysql_bind_var(&result,0,uint64,&job_id); + assert(res==0); + + res = mx_mysql_do_statement(mysql, query, ¶m, &result, &job_id, (void **)&job_id_out,sizeof(*job_id_out)); + if (res==1) { + job_id=job_id_out[0]; + } else { + job_id=0; + } + free(job_id_out); + return(job_id); +} + int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job) { struct mx_mysql_bind param = {0}; int res; int idx; + uint64_t job_id; assert(mysql); assert(daemon); @@ -314,16 +362,17 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i " host_slots = ?, " " server_id = ?," " job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) - " WHERE group_id = ?" + " WHERE job_id = ?" " AND job_status = " status_str(MXQ_JOB_STATUS_INQ) " AND daemon_id = 0" " AND host_hostname = ''" " AND server_id = ''" - " AND host_pid = 0" - " ORDER BY" - " job_priority," - " job_id" - " LIMIT 1"; + " AND host_pid = 0"; + + job_id=mxq_select_job_from_group(mysql,group_id); + if (!job_id) { + return(0); + } res = mx_mysql_bind_init_param(¶m, 5); assert(res == 0); @@ -334,7 +383,7 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i res += mx_mysql_bind_var(¶m, idx++, string, &daemon->hostname); res += mx_mysql_bind_var(¶m, idx++, uint64, &slots_per_job); res += mx_mysql_bind_var(¶m, idx++, string, &daemon->daemon_name); - res += mx_mysql_bind_var(¶m, idx++, uint64, &group_id); + res += mx_mysql_bind_var(¶m, idx++, uint64, &job_id); assert(res == 0); res = mx_mysql_do_statement_noresult(mysql, query, ¶m);