Skip to content

Commit

Permalink
Merge branch 'issue-51-nolock' into mpi
Browse files Browse the repository at this point in the history
* issue-51-nolock:
  mxq_job: Select job to run without locking
  • Loading branch information
donald committed Jun 13, 2017
2 parents 7276703 + fc6c8c4 commit 77da1b6
Showing 1 changed file with 55 additions and 7 deletions.
62 changes: 55 additions & 7 deletions mxq_job.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,58 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **
return res;
}

uint64_t mxq_select_job_from_group(struct mx_mysql *mysql, uint64_t group_id)
{
struct mx_mysql_bind param = {0};
struct mx_mysql_bind result = {0};
uint64_t job_id;
uint64_t *job_id_out;
int res;

char *query =
"SELECT"
" job_id"
" FROM"
" mxq_job"
" WHERE group_id = ?"
" AND job_status = " status_str(MXQ_JOB_STATUS_INQ)
" AND daemon_id = 0"
" AND host_hostname = ''"
" AND server_id = ''"
" AND host_pid = 0"
" ORDER BY"
" job_priority,"
" job_id"
" LIMIT 1";

res = mx_mysql_bind_init_param(&param, 1);
assert(res==0);

res += mx_mysql_bind_var(&param,0, uint64, &group_id);
assert(res==0);

res = mx_mysql_bind_init_result(&result, 1);
assert(res==0);

res += mx_mysql_bind_var(&result,0,uint64,&job_id);
assert(res==0);

res = mx_mysql_do_statement(mysql, query, &param, &result, &job_id, (void **)&job_id_out,sizeof(*job_id_out));
if (res==1) {
job_id=job_id_out[0];
} else {
job_id=0;
}
free(job_id_out);
return(job_id);
}

int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job)
{
struct mx_mysql_bind param = {0};
int res;
int idx;
uint64_t job_id;

assert(mysql);
assert(daemon);
Expand All @@ -314,16 +361,17 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i
" host_slots = ?, "
" server_id = ?,"
" job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED)
" WHERE group_id = ?"
" WHERE job_id = ?"
" AND job_status = " status_str(MXQ_JOB_STATUS_INQ)
" AND daemon_id = 0"
" AND host_hostname = ''"
" AND server_id = ''"
" AND host_pid = 0"
" ORDER BY"
" job_priority,"
" job_id"
" LIMIT 1";
" AND host_pid = 0";

job_id=mxq_select_job_from_group(mysql,group_id);
if (!job_id) {
return(0);
}

res = mx_mysql_bind_init_param(&param, 5);
assert(res == 0);
Expand All @@ -334,7 +382,7 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i
res += mx_mysql_bind_var(&param, idx++, string, &daemon->hostname);
res += mx_mysql_bind_var(&param, idx++, uint64, &slots_per_job);
res += mx_mysql_bind_var(&param, idx++, string, &daemon->daemon_name);
res += mx_mysql_bind_var(&param, idx++, uint64, &group_id);
res += mx_mysql_bind_var(&param, idx++, uint64, &job_id);
assert(res == 0);

res = mx_mysql_do_statement_noresult(mysql, query, &param);
Expand Down

0 comments on commit 77da1b6

Please sign in to comment.