Skip to content

Issue 51 limit increase #57

Merged
merged 4 commits into from
May 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions mx_mysql.c
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ static int mx__mysql_stmt_execute(struct mx_mysql_stmt *stmt)

case CR_SERVER_GONE_ERROR:
case CR_SERVER_LOST:
case ER_LOCK_DEADLOCK:
return -(errno=EAGAIN);

case CR_UNKNOWN_ERROR:
Expand Down
18 changes: 11 additions & 7 deletions mxq_job.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **
return res;
}

int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job)
int mxq_assign_jobs_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job, uint64_t limit)
{
struct mx_mysql_bind param = {0};
int res;
Expand All @@ -304,6 +304,8 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i
assert(daemon->daemon_name);
assert(*daemon->daemon_name);
assert(daemon->daemon_id);
assert(slots_per_job);
assert(limit);

char *query =
"UPDATE "
Expand All @@ -323,9 +325,9 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i
" ORDER BY"
" job_priority,"
" job_id"
" LIMIT 1";
" LIMIT ?";

res = mx_mysql_bind_init_param(&param, 5);
res = mx_mysql_bind_init_param(&param, 6);
assert(res == 0);

idx = 0;
Expand All @@ -335,6 +337,7 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i
res += mx_mysql_bind_var(&param, idx++, uint64, &slots_per_job);
res += mx_mysql_bind_var(&param, idx++, string, &daemon->daemon_name);
res += mx_mysql_bind_var(&param, idx++, uint64, &group_id);
res += mx_mysql_bind_var(&param, idx++, uint64, &limit);
assert(res == 0);

res = mx_mysql_do_statement_noresult(mysql, query, &param);
Expand Down Expand Up @@ -681,14 +684,15 @@ int mxq_load_job_from_group_assigned_to_daemon(struct mx_mysql *mysql, struct mx
}

int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *job, uint64_t group_id, struct mxq_daemon *daemon,
unsigned long slots_per_job)
unsigned long slots_per_job, uint64_t jobs_max)
{
int res;
struct mxq_job *jobs_tmp = NULL;

assert(mysql);
assert(job);
assert(daemon);
assert(jobs_max);

do {
res = mxq_load_job_from_group_assigned_to_daemon(mysql, &jobs_tmp, group_id, daemon);
Expand All @@ -703,13 +707,13 @@ int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *j
break;
}

res = mxq_assign_job_from_group_to_daemon(mysql, group_id, daemon, slots_per_job);
res = mxq_assign_jobs_from_group_to_daemon(mysql, group_id, daemon, slots_per_job, jobs_max);
if (res < 0) {
mx_log_err(" group_id=%lu :: mxq_assign_job_from_group_to_daemon(): %m", group_id);
mx_log_err(" group_id=%lu :: mxq_assign_jobs_from_group_to_daemon(): %m", group_id);
return 0;
}
if (res == 0) {
mx_log_warning(" group_id=%lu :: mxq_assign_job_from_group_to_daemon(): No matching job found - maybe another server was a bit faster. ;)", group_id);
mx_log_warning(" group_id=%lu :: mxq_assign_jobs_from_group_to_daemon(): No matching job found - maybe another server was a bit faster. ;)", group_id);
return 0;
}
} while (1);
Expand Down
4 changes: 2 additions & 2 deletions mxq_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ void mxq_job_free_content(struct mxq_job *j);
int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job_id);
int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp);
int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp, uint64_t job_status);
int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job);
int mxq_assign_jobs_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job, uint64_t limit);
int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, struct mxq_daemon *daemon);
int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_unknown(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j);
int mxq_load_job_from_group_assigned_to_daemon(struct mx_mysql *mysql, struct mxq_job **jobs_result, uint64_t group_id, struct mxq_daemon *daemon);
int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, struct mxq_daemon *daemon,unsigned long slots_per_job);
int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, struct mxq_daemon *daemon,unsigned long slots_per_job, uint64_t jobs_max);
int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **jobs_result, struct mxq_daemon *daemon);
#endif
2 changes: 1 addition & 1 deletion mxqd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ unsigned long start_job(struct mxq_group_list *glist)
group = &glist->group;
job = &_mxqjob;

res = mxq_load_job_from_group_for_daemon(server->mysql, job, group->group_id, daemon, glist->slots_per_job);
res = mxq_load_job_from_group_for_daemon(server->mysql, job, group->group_id, daemon, glist->slots_per_job, glist->jobs_max);
if (!res) {
return 0;
}
Expand Down