Skip to content

Commit

Permalink
Merge pull request #57 from mariux64/issue-51-limit-increase
Browse files Browse the repository at this point in the history
Issue 51 limit increase
  • Loading branch information
donald authored May 10, 2017
2 parents 9802442 + 72f2543 commit b2298db
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 10 deletions.
1 change: 1 addition & 0 deletions mx_mysql.c
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ static int mx__mysql_stmt_execute(struct mx_mysql_stmt *stmt)

case CR_SERVER_GONE_ERROR:
case CR_SERVER_LOST:
case ER_LOCK_DEADLOCK:
return -(errno=EAGAIN);

case CR_UNKNOWN_ERROR:
Expand Down
18 changes: 11 additions & 7 deletions mxq_job.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **
return res;
}

int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job)
int mxq_assign_jobs_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job, uint64_t limit)
{
struct mx_mysql_bind param = {0};
int res;
Expand All @@ -304,6 +304,8 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i
assert(daemon->daemon_name);
assert(*daemon->daemon_name);
assert(daemon->daemon_id);
assert(slots_per_job);
assert(limit);

char *query =
"UPDATE "
Expand All @@ -323,9 +325,9 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i
" ORDER BY"
" job_priority,"
" job_id"
" LIMIT 1";
" LIMIT ?";

res = mx_mysql_bind_init_param(&param, 5);
res = mx_mysql_bind_init_param(&param, 6);
assert(res == 0);

idx = 0;
Expand All @@ -335,6 +337,7 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i
res += mx_mysql_bind_var(&param, idx++, uint64, &slots_per_job);
res += mx_mysql_bind_var(&param, idx++, string, &daemon->daemon_name);
res += mx_mysql_bind_var(&param, idx++, uint64, &group_id);
res += mx_mysql_bind_var(&param, idx++, uint64, &limit);
assert(res == 0);

res = mx_mysql_do_statement_noresult(mysql, query, &param);
Expand Down Expand Up @@ -681,14 +684,15 @@ int mxq_load_job_from_group_assigned_to_daemon(struct mx_mysql *mysql, struct mx
}

int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *job, uint64_t group_id, struct mxq_daemon *daemon,
unsigned long slots_per_job)
unsigned long slots_per_job, uint64_t jobs_max)
{
int res;
struct mxq_job *jobs_tmp = NULL;

assert(mysql);
assert(job);
assert(daemon);
assert(jobs_max);

do {
res = mxq_load_job_from_group_assigned_to_daemon(mysql, &jobs_tmp, group_id, daemon);
Expand All @@ -703,13 +707,13 @@ int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *j
break;
}

res = mxq_assign_job_from_group_to_daemon(mysql, group_id, daemon, slots_per_job);
res = mxq_assign_jobs_from_group_to_daemon(mysql, group_id, daemon, slots_per_job, jobs_max);
if (res < 0) {
mx_log_err(" group_id=%lu :: mxq_assign_job_from_group_to_daemon(): %m", group_id);
mx_log_err(" group_id=%lu :: mxq_assign_jobs_from_group_to_daemon(): %m", group_id);
return 0;
}
if (res == 0) {
mx_log_warning(" group_id=%lu :: mxq_assign_job_from_group_to_daemon(): No matching job found - maybe another server was a bit faster. ;)", group_id);
mx_log_warning(" group_id=%lu :: mxq_assign_jobs_from_group_to_daemon(): No matching job found - maybe another server was a bit faster. ;)", group_id);
return 0;
}
} while (1);
Expand Down
4 changes: 2 additions & 2 deletions mxq_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ void mxq_job_free_content(struct mxq_job *j);
int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job_id);
int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp);
int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp, uint64_t job_status);
int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job);
int mxq_assign_jobs_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job, uint64_t limit);
int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, struct mxq_daemon *daemon);
int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_unknown(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j);
int mxq_load_job_from_group_assigned_to_daemon(struct mx_mysql *mysql, struct mxq_job **jobs_result, uint64_t group_id, struct mxq_daemon *daemon);
int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, struct mxq_daemon *daemon,unsigned long slots_per_job);
int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, struct mxq_daemon *daemon,unsigned long slots_per_job, uint64_t jobs_max);
int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **jobs_result, struct mxq_daemon *daemon);
#endif
2 changes: 1 addition & 1 deletion mxqd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ unsigned long start_job(struct mxq_group_list *glist)
group = &glist->group;
job = &_mxqjob;

res = mxq_load_job_from_group_for_daemon(server->mysql, job, group->group_id, daemon, glist->slots_per_job);
res = mxq_load_job_from_group_for_daemon(server->mysql, job, group->group_id, daemon, glist->slots_per_job, glist->jobs_max);
if (!res) {
return 0;
}
Expand Down

0 comments on commit b2298db

Please sign in to comment.