Skip to content
Navigation Menu
Toggle navigation
Sign in
In this repository
All GitHub Enterprise
↵
Jump to
↵
No suggested jump to results
In this repository
All GitHub Enterprise
↵
Jump to
↵
In this user
All GitHub Enterprise
↵
Jump to
↵
In this repository
All GitHub Enterprise
↵
Jump to
↵
Sign in
Reseting focus
You signed in with another tab or window.
Reload
to refresh your session.
You signed out in another tab or window.
Reload
to refresh your session.
You switched accounts on another tab or window.
Reload
to refresh your session.
Dismiss alert
{{ message }}
mariux
/
mxq
Public
forked from
mariux64/mxq
Notifications
You must be signed in to change notification settings
Fork
0
Star
0
Code
Pull requests
0
Actions
Projects
0
Security
Insights
Issues
Additional navigation options
Code
Pull requests
Actions
Projects
Security
Insights
Issues
Files
9c739de
manpages
mysql
web
.gitignore
LICENSE
Makefile
README.md
mx_flock.c
mx_flock.h
mx_getopt.c
mx_getopt.h
mx_log.c
mx_log.h
mx_mysql.c
mx_mysql.h
mx_util.c
mx_util.h
mxq.h
mxq_group.c
mxq_group.h
mxq_job.c
mxq_job.h
mxq_log.c
mxqadmin.c
mxqd.c
mxqd.h
mxqdctl-hostconfig.sh
mxqdump.c
mxqkill.c
mxqsub.c
os-release
test.c
test_mx_log.c
test_mx_mysql.c
test_mx_util.c
Breadcrumbs
mxq
/
mxq_job.c
Blame
Blame
Latest commit
History
History
702 lines (589 loc) · 22.1 KB
Breadcrumbs
mxq
/
mxq_job.c
Top
File metadata and controls
Code
Blame
702 lines (589 loc) · 22.1 KB
Raw
#define _GNU_SOURCE #include <stdio.h> #include <mysql.h> #include <errno.h> #include <assert.h> #include <unistd.h> #include <libgen.h> #include <string.h> #include <sys/resource.h> #include "mx_util.h" #include "mx_log.h" #include "mxq_group.h" #include "mxq_job.h" #define JOB_FIELDS_CNT 35 #define JOB_FIELDS \ " job_id, " \ " job_status, " \ " job_flags, " \ " job_priority, " \ " group_id, " \ \ " job_workdir, " \ " job_argc, " \ " job_argv, " \ " job_stdout, " \ " job_stderr, " \ \ " job_umask, " \ " host_submit, " \ " host_id, " \ " server_id, " \ " host_hostname, " \ \ " host_pid, " \ " host_slots, " \ " UNIX_TIMESTAMP(date_submit) as date_submit, " \ " UNIX_TIMESTAMP(date_start) as date_start, " \ " UNIX_TIMESTAMP(date_end) as date_end, " \ \ " stats_status, " \ " stats_utime_sec, " \ " stats_utime_usec, " \ " stats_stime_sec, " \ " stats_stime_usec, " \ \ " stats_real_sec, " \ " stats_real_usec, " \ " stats_maxrss, " \ " stats_minflt, " \ " stats_majflt, " \ \ " stats_nswap, " \ " stats_inblock, " \ " stats_oublock, " \ " stats_nvcsw, " \ " stats_nivcsw" static int bind_result_job_fields(struct mx_mysql_bind *result, struct mxq_job *j) { int res = 0; int idx = 0; res = mx_mysql_bind_init_result(result, JOB_FIELDS_CNT); assert(res >= 0); res += mx_mysql_bind_var(result, idx++, uint64, &(j->job_id)); res += mx_mysql_bind_var(result, idx++, uint16, &(j->job_status)); res += mx_mysql_bind_var(result, idx++, uint64, &(j->job_flags)); res += mx_mysql_bind_var(result, idx++, uint16, &(j->job_priority)); res += mx_mysql_bind_var(result, idx++, uint64, &(j->group_id)); res += mx_mysql_bind_var(result, idx++, string, &(j->job_workdir)); res += mx_mysql_bind_var(result, idx++, uint16, &(j->job_argc)); res += mx_mysql_bind_var(result, idx++, string, &(j->job_argv_str)); res += mx_mysql_bind_var(result, idx++, string, &(j->job_stdout)); res += mx_mysql_bind_var(result, idx++, string, &(j->job_stderr)); res += mx_mysql_bind_var(result, idx++, uint32, &(j->job_umask)); res += mx_mysql_bind_var(result, idx++, string, &(j->host_submit)); res += mx_mysql_bind_var(result, idx++, string, &(j->host_id)); res += mx_mysql_bind_var(result, idx++, string, &(j->server_id)); res += mx_mysql_bind_var(result, idx++, string, &(j->host_hostname)); res += mx_mysql_bind_var(result, idx++, uint32, &(j->host_pid)); res += mx_mysql_bind_var(result, idx++, uint32, &(j->host_slots)); res += mx_mysql_bind_var(result, idx++, int64, &(j->date_submit)); res += mx_mysql_bind_var(result, idx++, int64, &(j->date_start)); res += mx_mysql_bind_var(result, idx++, int64, &(j->date_end)); res += mx_mysql_bind_var(result, idx++, int32, &(j->stats_status)); res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_utime.tv_sec)); res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_utime.tv_usec)); res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_stime.tv_sec)); res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_stime.tv_usec)); res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_realtime.tv_sec)); res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_realtime.tv_usec)); res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_maxrss)); res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_minflt)); res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_majflt)); res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_nswap)); res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_inblock)); res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_oublock)); res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_nvcsw)); res += mx_mysql_bind_var(result, idx++, int64, &(j->stats_rusage.ru_nivcsw)); return res; } char *mxq_job_status_to_name(uint64_t status) { switch (status) { case MXQ_JOB_STATUS_INQ: return "inq"; case MXQ_JOB_STATUS_ASSIGNED: return "assigned"; case MXQ_JOB_STATUS_LOADED: return "loaded"; case MXQ_JOB_STATUS_RUNNING: return "running"; case MXQ_JOB_STATUS_UNKNOWN_RUN: return "running-unknown"; case MXQ_JOB_STATUS_EXTRUNNING: return "running-external"; case MXQ_JOB_STATUS_STOPPED: return "stopped"; case MXQ_JOB_STATUS_EXIT: return "exited"; case MXQ_JOB_STATUS_KILLING: return "killing"; case MXQ_JOB_STATUS_KILLED: return "killed"; case MXQ_JOB_STATUS_FAILED: return "failed"; case MXQ_JOB_STATUS_UNKNOWN_PRE: return "unknownpre"; case MXQ_JOB_STATUS_CANCELLED: return "cancelled"; case MXQ_JOB_STATUS_CANCELLING: return "cancelling"; case MXQ_JOB_STATUS_UNKNOWN: return "unknown"; case MXQ_JOB_STATUS_FINISHED: return "finished"; } return "invalid"; } void mxq_job_free_content(struct mxq_job *j) { mx_free_null(j->job_workdir); mx_free_null(j->job_argv_str); mx_free_null(j->job_stdout); mx_free_null(j->job_stderr); if (j->tmp_stderr == j->tmp_stdout) { j->tmp_stdout = NULL; } else { mx_free_null(j->tmp_stdout); } mx_free_null(j->tmp_stderr); mx_free_null(j->host_submit); mx_free_null(j->host_id); mx_free_null(j->server_id); mx_free_null(j->host_hostname); mx_free_null(j->job_argv); j->job_argv = NULL; } int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job_id) { int res; struct mxq_job *jobs = NULL; struct mxq_job j = {0}; struct mx_mysql_bind param = {0}; struct mx_mysql_bind result = {0}; assert(mysql); assert(mxq_jobs); assert(!(*mxq_jobs)); char *query = "SELECT" JOB_FIELDS " FROM mxq_job" " WHERE job_id = ?" " LIMIT 1"; res = mx_mysql_bind_init_param(¶m, 1); assert(res == 0); res = mx_mysql_bind_var(¶m, 0, uint64, &job_id); assert(res == 0); res = bind_result_job_fields(&result, &j); assert(res == 0); res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); if (res < 0) { mx_log_err("mx_mysql_do_statement(): %m"); return res; } *mxq_jobs = jobs; return res; } int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp) { int res; struct mxq_job *jobs = NULL; struct mxq_job j = {0}; struct mx_mysql_bind param = {0}; struct mx_mysql_bind result = {0}; assert(mysql); assert(mxq_jobs); assert(!(*mxq_jobs)); char *query = "SELECT" JOB_FIELDS " FROM mxq_job" " WHERE group_id = ? OR 1 = 0" " ORDER BY server_id, host_hostname, job_id"; res = mx_mysql_bind_init_param(¶m, 1); assert(res == 0); res = mx_mysql_bind_var(¶m, 0, uint64, &(grp->group_id)); assert(res == 0); res = bind_result_job_fields(&result, &j); assert(res == 0); res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); if (res < 0) { mx_log_err("mx_mysql_do_statement(): %m"); return res; } *mxq_jobs = jobs; return res; } int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp, uint64_t job_status) { int res; struct mxq_job *jobs = NULL; struct mxq_job j = {0}; struct mx_mysql_bind param = {0}; struct mx_mysql_bind result = {0}; assert(mysql); assert(mxq_jobs); assert(!(*mxq_jobs)); char *query = "SELECT" JOB_FIELDS " FROM mxq_job" " WHERE group_id = ?" " AND job_status = ?" " ORDER BY server_id, host_hostname, job_id"; res = mx_mysql_bind_init_param(¶m, 2); assert(res == 0); res = 0; res += mx_mysql_bind_var(¶m, 0, uint64, &(grp->group_id)); res += mx_mysql_bind_var(¶m, 1, uint64, &job_status); assert(res == 0); res = bind_result_job_fields(&result, &j); assert(res == 0); res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); if (res < 0) { mx_log_err("mx_mysql_do_statement(): %m"); return res; } *mxq_jobs = jobs; return res; } int mxq_assign_job_from_group_to_server(struct mx_mysql *mysql, uint64_t group_id, char *hostname, char *server_id) { int res; struct mx_mysql_bind param = {0}; assert(mysql); assert(hostname); assert(*hostname); assert(server_id); assert(*server_id); char *query = "UPDATE mxq_job SET" " job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) "," " host_hostname = ?," " server_id = ?" " WHERE group_id = ?" " AND job_status = " status_str(MXQ_JOB_STATUS_INQ) " AND host_hostname = ''" " AND server_id = ''" " AND host_pid = 0" " ORDER BY job_priority, job_id" " LIMIT 1"; res = mx_mysql_bind_init_param(¶m, 3); assert(res == 0); res = 0; res += mx_mysql_bind_var(¶m, 0, string, &hostname); res += mx_mysql_bind_var(¶m, 1, string, &server_id); res += mx_mysql_bind_var(¶m, 2, uint64, &group_id); assert(res == 0); res = mx_mysql_do_statement_noresult(mysql, query, ¶m); if (res < 0) { mx_log_err("mx_mysql_do_statement(): %m"); return res; } return res; } int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, char *hostname, char *server_id) { int res; struct mx_mysql_bind param = {0}; assert(mysql); assert(hostname); assert(*hostname); assert(server_id); assert(*server_id); char *query = "UPDATE mxq_job SET" " job_status = " status_str(MXQ_JOB_STATUS_INQ) " WHERE host_pid = 0" " AND job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) " AND host_hostname = ?" " AND server_id = ?"; res = mx_mysql_bind_init_param(¶m, 2); assert(res == 0); res = 0; res += mx_mysql_bind_var(¶m, 0, string, &hostname); res += mx_mysql_bind_var(¶m, 1, string, &server_id); assert(res == 0); res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, ¶m); if (res < 0) { mx_log_err("mx_mysql_do_statement(): %m"); return res; } return res; } int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job) { int res; struct mx_mysql_bind param = {0}; assert(mysql); assert(job); char *query = "UPDATE mxq_job SET" " job_status = " status_str(MXQ_JOB_STATUS_LOADED) " WHERE job_id = ?" " AND job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) " AND host_hostname = ?" " AND server_id = ?" " AND host_pid = 0"; res = mx_mysql_bind_init_param(¶m, 3); assert(res == 0); res = 0; res += mx_mysql_bind_var(¶m, 0, uint64, &(job->job_id)); res += mx_mysql_bind_var(¶m, 1, string, &(job->host_hostname)); res += mx_mysql_bind_var(¶m, 2, string, &(job->server_id)); assert(res == 0); res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, ¶m); if (res < 0) { mx_log_err("mx_mysql_do_statement(): %m"); return res; } job->job_status = MXQ_JOB_STATUS_LOADED; return res; } int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job) { int res; struct mx_mysql_bind param = {0}; assert(mysql); assert(job); if (job->job_status != MXQ_JOB_STATUS_LOADED) { mx_log_warning("new status==runnning but old status(=%d) is != loaded ", job->job_status); } char *query = "UPDATE mxq_job SET" " job_status = " status_str(MXQ_JOB_STATUS_RUNNING) "," " date_start = NULL," " host_pid = ?," " host_slots = ?" " WHERE job_id = ?" " AND job_status = " status_str(MXQ_JOB_STATUS_LOADED) " AND host_hostname = ?" " AND server_id = ?" " AND host_pid = 0"; res = mx_mysql_bind_init_param(¶m, 5); assert(res == 0); res = 0; res += mx_mysql_bind_var(¶m, 0, uint32, &(job->host_pid)); res += mx_mysql_bind_var(¶m, 1, uint32, &(job->host_slots)); res += mx_mysql_bind_var(¶m, 2, uint64, &(job->job_id)); res += mx_mysql_bind_var(¶m, 3, string, &(job->host_hostname)); res += mx_mysql_bind_var(¶m, 4, string, &(job->server_id)); assert(res == 0); res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, ¶m); if (res < 0) { mx_log_err("mx_mysql_do_statement(): %m"); return res; } job->job_status = MXQ_JOB_STATUS_RUNNING; return res; } int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job) { int res; uint16_t newstatus; struct mx_mysql_bind param = {0}; assert(mysql); assert(job); if (WIFEXITED(job->stats_status)) { if (WEXITSTATUS(job->stats_status)) { newstatus = MXQ_JOB_STATUS_FAILED; } else { newstatus = MXQ_JOB_STATUS_FINISHED; } } else if(WIFSIGNALED(job->stats_status)) { newstatus = MXQ_JOB_STATUS_KILLED; } else { mx_log_err("Status change to status_exit called with unknown stats_status (%d). Aborting Status change.", job->stats_status); errno = EINVAL; return -1; } if (job->job_status != MXQ_JOB_STATUS_RUNNING && job->job_status != MXQ_JOB_STATUS_KILLING) { mx_log_warning("new status==exited but old status(=%d) is != running ", job->job_status); } char *query = "UPDATE mxq_job SET" " job_status = ?," " date_end = NULL," " stats_status = ?, " " stats_utime_sec = ?, " " stats_utime_usec = ?, " " stats_stime_sec = ?, " " stats_stime_usec = ?, " " stats_real_sec = ?, " " stats_real_usec = ?, " " stats_maxrss = ?, " " stats_minflt = ?, " " stats_majflt = ?, " " stats_nswap = ?, " " stats_inblock = ?, " " stats_oublock = ?, " " stats_nvcsw = ?, " " stats_nivcsw = ?" " WHERE job_id = ?" " AND job_status IN (" status_str(MXQ_JOB_STATUS_LOADED) ", " status_str(MXQ_JOB_STATUS_RUNNING) ", " status_str(MXQ_JOB_STATUS_KILLING) ")" " AND host_hostname = ?" " AND server_id = ?" " AND host_pid = ?"; res = mx_mysql_bind_init_param(¶m, 20); assert(res == 0); res = 0; res += mx_mysql_bind_var(¶m, 0, uint16, &(newstatus)); res += mx_mysql_bind_var(¶m, 1, int32, &(job->stats_status)); res += mx_mysql_bind_var(¶m, 2, int64, &(job->stats_rusage.ru_utime.tv_sec)); res += mx_mysql_bind_var(¶m, 3, int64, &(job->stats_rusage.ru_utime.tv_usec)); res += mx_mysql_bind_var(¶m, 4, int64, &(job->stats_rusage.ru_stime.tv_sec)); res += mx_mysql_bind_var(¶m, 5, int64, &(job->stats_rusage.ru_stime.tv_usec)); res += mx_mysql_bind_var(¶m, 6, int64, &(job->stats_realtime.tv_sec)); res += mx_mysql_bind_var(¶m, 7, int64, &(job->stats_realtime.tv_usec)); res += mx_mysql_bind_var(¶m, 8, int64, &(job->stats_rusage.ru_maxrss)); res += mx_mysql_bind_var(¶m, 9, int64, &(job->stats_rusage.ru_minflt)); res += mx_mysql_bind_var(¶m, 10, int64, &(job->stats_rusage.ru_majflt)); res += mx_mysql_bind_var(¶m, 11, int64, &(job->stats_rusage.ru_nswap)); res += mx_mysql_bind_var(¶m, 12, int64, &(job->stats_rusage.ru_inblock)); res += mx_mysql_bind_var(¶m, 13, int64, &(job->stats_rusage.ru_oublock)); res += mx_mysql_bind_var(¶m, 14, int64, &(job->stats_rusage.ru_nvcsw)); res += mx_mysql_bind_var(¶m, 15, int64, &(job->stats_rusage.ru_nivcsw)); res += mx_mysql_bind_var(¶m, 16, uint64, &(job->job_id)); res += mx_mysql_bind_var(¶m, 17, string, &(job->host_hostname)); res += mx_mysql_bind_var(¶m, 18, string, &(job->server_id)); res += mx_mysql_bind_var(¶m, 19, uint32, &(job->host_pid)); assert(res == 0); res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, ¶m); if (res < 0) { mx_log_err("mx_mysql_do_statement(): %m"); return res; } job->job_status = newstatus; return res; } int mxq_set_job_status_unknown_for_server(struct mx_mysql *mysql, char *hostname, char *server_id) { int res; struct mx_mysql_bind param = {0}; assert(mysql); assert(hostname); assert(*hostname); assert(server_id); assert(*server_id); char *query = "UPDATE mxq_job SET" " job_status = " status_str(MXQ_JOB_STATUS_UNKNOWN) "," " date_end = NULL" " WHERE job_status IN (" status_str(MXQ_JOB_STATUS_LOADED) "," status_str(MXQ_JOB_STATUS_RUNNING) "," status_str(MXQ_JOB_STATUS_KILLING) ")" " AND host_hostname = ?" " AND server_id = ?"; res = mx_mysql_bind_init_param(¶m, 2); assert(res == 0); res = 0; res += mx_mysql_bind_var(¶m, 0, string, &hostname); res += mx_mysql_bind_var(¶m, 1, string, &server_id); assert(res == 0); res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, ¶m); if (res < 0) { mx_log_err("mx_mysql_do_statement(): %m"); return res; } return res; } int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j) { if (!mx_streq(j->job_stdout, "/dev/null")) { _mx_cleanup_free_ char *dir = NULL; dir = mx_dirname_forever(j->job_stdout); mx_asprintf_forever(&j->tmp_stdout, "%s/mxq.%u.%lu.%lu.%s.%s.%d.stdout.tmp", dir, g->user_uid, g->group_id, j->job_id, j->host_hostname, j->server_id, j->host_pid); } if (!mx_streq(j->job_stderr, "/dev/null")) { _mx_cleanup_free_ char *dir = NULL; if (mx_streq(j->job_stderr, j->job_stdout)) { j->tmp_stderr = j->tmp_stdout; return 1; } dir = mx_dirname_forever(j->job_stderr); mx_asprintf_forever(&j->tmp_stderr, "%s/mxq.%u.%lu.%lu.%s.%s.%d.stderr.tmp", dir, g->user_uid, g->group_id, j->job_id, j->host_hostname, j->server_id, j->host_pid); } return 1; } int mxq_load_job_assigned_to_server(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, char *hostname, char *server_id) { int res; struct mxq_job *jobs = NULL; struct mxq_job j = {0}; struct mx_mysql_bind param = {0}; struct mx_mysql_bind result = {0}; assert(mysql); assert(mxq_jobs); assert(!(*mxq_jobs)); assert(hostname); assert(*hostname); assert(server_id); assert(*server_id); char *query = "SELECT" JOB_FIELDS " FROM mxq_job" " WHERE job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED) " AND host_hostname = ?" " AND server_id = ?" " LIMIT 1"; res = mx_mysql_bind_init_param(¶m, 2); assert(res == 0); res = 0; res += mx_mysql_bind_var(¶m, 0, string, &hostname); res += mx_mysql_bind_var(¶m, 1, string, &server_id); assert(res == 0); res = bind_result_job_fields(&result, &j); assert(res == 0); res = mx_mysql_do_statement(mysql, query, ¶m, &result, &j, (void **)&jobs, sizeof(*jobs)); if (res < 0) { mx_log_err("mx_mysql_do_statement(): %m"); return res; } *mxq_jobs = jobs; return res; } int mxq_load_job_from_group_for_server(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, char *hostname, char *server_id) { int res; struct mxq_job *jobs = NULL; assert(mysql); assert(mxqjob); assert(hostname); assert(*hostname); assert(server_id); assert(*server_id); do { res = mxq_load_job_assigned_to_server(mysql, &jobs, hostname, server_id); if(res < 0) { mx_log_err(" group_id=%lu :: mxq_load_job_assigned_to_server: %m", group_id); return 0; } if(res == 1) { memcpy(mxqjob, &jobs[0], sizeof(*mxqjob)); break; } res = mxq_assign_job_from_group_to_server(mysql, group_id, hostname, server_id); if (res < 0) { mx_log_err(" group_id=%lu :: mxq_assign_job_from_group_to_server(): %m", group_id); return 0; } if (res == 0) { mx_log_warning(" group_id=%lu :: mxq_assign_job_from_group_to_server(): No matching job found - maybe another server was a bit faster. ;)", group_id); return 0; } } while (1); res = mxq_set_job_status_loaded_on_server(mysql, mxqjob); if (res < 0) { mx_log_err(" group_id=%lu job_id=%lu :: mxq_set_job_status_loaded_on_server(): %m", group_id, mxqjob->job_id); return 0; } if (res == 0) { mx_log_err(" group_id=%lu job_id=%lu :: mxq_set_job_status_loaded_on_server(): Job not found", group_id, mxqjob->job_id); return 0; } mxqjob->job_status = MXQ_JOB_STATUS_LOADED; return 1; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
You can’t perform that action at this time.