Skip to content

Commit

Permalink
mxq_job: Select job to run without locking
Browse files Browse the repository at this point in the history
When the server chose a job to assign to itself by

   UPDATE mxq_job SET ... WHERE (unassigned) ORDER BY LIMIT 1

we had a performance problem when a group contained very many
pending jobs, because the UPDATE temporarily locked all records
considered (issue #51).

Now we SELECT without any lock and then UPDATE the single
job by its primary index. In case there is a collision
with another server, the second UPDATE will have no effect,
because it still contains the original contraints. The
second server will returns to its main loop where it
will sleep for some seconds.
  • Loading branch information
donald committed May 4, 2017
1 parent 7276703 commit fc6c8c4
Showing 1 changed file with 55 additions and 7 deletions.
62 changes: 55 additions & 7 deletions mxq_job.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,58 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **
return res;
}

uint64_t mxq_select_job_from_group(struct mx_mysql *mysql, uint64_t group_id)
{
struct mx_mysql_bind param = {0};
struct mx_mysql_bind result = {0};
uint64_t job_id;
uint64_t *job_id_out;
int res;

char *query =
"SELECT"
" job_id"
" FROM"
" mxq_job"
" WHERE group_id = ?"
" AND job_status = " status_str(MXQ_JOB_STATUS_INQ)
" AND daemon_id = 0"
" AND host_hostname = ''"
" AND server_id = ''"
" AND host_pid = 0"
" ORDER BY"
" job_priority,"
" job_id"
" LIMIT 1";

res = mx_mysql_bind_init_param(&param, 1);
assert(res==0);

res += mx_mysql_bind_var(&param,0, uint64, &group_id);
assert(res==0);

res = mx_mysql_bind_init_result(&result, 1);
assert(res==0);

res += mx_mysql_bind_var(&result,0,uint64,&job_id);
assert(res==0);

res = mx_mysql_do_statement(mysql, query, &param, &result, &job_id, (void **)&job_id_out,sizeof(*job_id_out));
if (res==1) {
job_id=job_id_out[0];
} else {
job_id=0;
}
free(job_id_out);
return(job_id);
}

int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job)
{
struct mx_mysql_bind param = {0};
int res;
int idx;
uint64_t job_id;

assert(mysql);
assert(daemon);
Expand All @@ -314,16 +361,17 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i
" host_slots = ?, "
" server_id = ?,"
" job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED)
" WHERE group_id = ?"
" WHERE job_id = ?"
" AND job_status = " status_str(MXQ_JOB_STATUS_INQ)
" AND daemon_id = 0"
" AND host_hostname = ''"
" AND server_id = ''"
" AND host_pid = 0"
" ORDER BY"
" job_priority,"
" job_id"
" LIMIT 1";
" AND host_pid = 0";

job_id=mxq_select_job_from_group(mysql,group_id);
if (!job_id) {
return(0);
}

res = mx_mysql_bind_init_param(&param, 5);
assert(res == 0);
Expand All @@ -334,7 +382,7 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i
res += mx_mysql_bind_var(&param, idx++, string, &daemon->hostname);
res += mx_mysql_bind_var(&param, idx++, uint64, &slots_per_job);
res += mx_mysql_bind_var(&param, idx++, string, &daemon->daemon_name);
res += mx_mysql_bind_var(&param, idx++, uint64, &group_id);
res += mx_mysql_bind_var(&param, idx++, uint64, &job_id);
assert(res == 0);

res = mx_mysql_do_statement_noresult(mysql, query, &param);
Expand Down

0 comments on commit fc6c8c4

Please sign in to comment.