Skip to content

Commit

Permalink
mxqd: Use mx_mysql* and remove mxq_mysql* forever
Browse files Browse the repository at this point in the history
  • Loading branch information
mariux committed Jul 26, 2015
1 parent fff762e commit 5fdf6bf
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 741 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ mx_util.o
mxq_group.o
mxqdump.o
mxq_job.o
mxq_mysql.o
mxqd.o
mxqsub.o
mxqkill.o
Expand Down
23 changes: 3 additions & 20 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,6 @@ mx_mysql.h += $(mx_util.h)

mx_mxq.h += mx_mxq.h

### mxq_mysql.h --------------------------------------------------------

mxq_mysql.h += mxq_mysql.h
mxq_mysql.h += $(mxq_util.h)

### mxq_util.h ---------------------------------------------------------

mxq_util.h += mxq_util.h
Expand Down Expand Up @@ -253,20 +248,10 @@ mxq_log.o: $(mx_log.h)

clean: CLEAN += mxq_log.o

### mxq_mysql.o --------------------------------------------------------

mxq_mysql.o: $(mx_log.h)
mxq_mysql.o: $(mxq_mysql.h)
mxq_mysql.o: $(mxq_util.h)
mxq_mysql.o: CFLAGS += $(CFLAGS_MYSQL)

clean: CLEAN += mxq_mysql.o

### mxqdump.o ---------------------------------------------------

mxqdump.o: $(mx_log.h)
mxqdump.o: $(mxq_util.h)
mxqdump.o: $(mxq_mysql.h)
mxqdump.o: $(mx_getopt.h)
mxqdump.o: CFLAGS += $(CFLAGS_MYSQL)

Expand Down Expand Up @@ -297,7 +282,7 @@ clean: CLEAN += mxq_util.o

mxq_group.o: $(mx_log.h)
mxq_group.o: $(mxq_group.h)
mxq_group.o: $(mxq_mysql.h)
mxq_group.o: $(mx_mysql.h)
mxq_group.o: CFLAGS += $(CFLAGS_MYSQL)

clean: CLEAN += mxq_group.o
Expand All @@ -308,7 +293,7 @@ mxq_job.o: $(mx_util.h)
mxq_job.o: $(mx_log.h)
mxq_job.o: $(mxq_job.h)
mxq_job.o: $(mxq_group.h)
mxq_job.o: $(mxq_mysql.h)
mxq_job.o: $(mx_mysql.h)
mxq_job.o: CFLAGS += $(CFLAGS_MYSQL)

clean: CLEAN += mxq_job.o
Expand All @@ -322,7 +307,7 @@ mxqd.o: $(mx_log.h)
mxqd.o: $(mxqd.h)
mxqd.o: $(mxq_group.h)
mxqd.o: $(mxq_job.h)
mxqd.o: $(mxq_mysql.h)
mxqd.o: $(mx_mysql.h)
mxqd.o: CFLAGS += $(CFLAGS_MYSQL)
mxqd.o: CFLAGS += $(CFLAGS_MXQ_INITIAL_PATH)
mxqd.o: CFLAGS += -Wno-unused-but-set-variable
Expand Down Expand Up @@ -355,7 +340,6 @@ mxqd: mx_getopt.o
mxqd: mxq_group.o
mxqd: mxq_job.o
mxqd: mxq_util.o
mxqd: mxq_mysql.o
mxqd: mx_mysql.o
mxqd: LDLIBS += $(LDLIBS_MYSQL)

Expand Down Expand Up @@ -388,7 +372,6 @@ mxqdump: mx_log.o
mxqdump: mx_mysql.o
mxqdump: mxq_group.o
mxqdump: mxq_job.o
mxqdump: mxq_mysql.o
mxqdump: mxq_util.o
mxqdump: mx_util.o
mxqdump: mx_getopt.o
Expand Down
15 changes: 10 additions & 5 deletions mx_mysql.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@

#define mx__mysql_log_emerg(mysql) mx__mysql_log(emerg, (mysql))
#define mx__mysql_log_err(mysql) mx__mysql_log(err, (mysql))
#define mx__mysql_log_warning(mysql) mx__mysql_log(warning, (mysql))
#define mx__mysql_log_info(mysql) mx__mysql_log(info, (mysql))
#define mx__mysql_log_debug(mysql) mx__mysql_log(debug, (mysql))

#define mx__mysql_stmt_log_emerg(stmt) mx__mysql_stmt_log(emerg, (stmt))
#define mx__mysql_stmt_log_err(stmt) mx__mysql_stmt_log(err, (stmt))
#define mx__mysql_stmt_log_warning(stmt) mx__mysql_stmt_log(warning, (stmt))
#define mx__mysql_stmt_log_info(stmt) mx__mysql_stmt_log(info, (stmt))
#define mx__mysql_stmt_log_debug(stmt) mx__mysql_stmt_log(debug, (stmt))

Expand Down Expand Up @@ -910,8 +912,9 @@ int mx_mysql_connect_forever_sec(struct mx_mysql **mysql, unsigned int seconds)
int res;

while ((res = mx_mysql_connect(mysql)) < 0) {
mx__mysql_log_info(*mysql);
mx__mysql_log_warning(*mysql);
mx_mysql_assert_usage_ok(res);
mx_log_warning("mx_mysql_connect() failed: %m - retrying (forever) in %d second(s).", seconds);
mx_sleep(seconds);
}

Expand Down Expand Up @@ -1014,8 +1017,8 @@ int mx_mysql_statement_init(struct mx_mysql *mysql, struct mx_mysql_stmt **stmt)
if (res != -ENOMEM)
return res;

mx_log_debug("mx__mysql_stmt_init() failed: %m - retrying (forever) in %d second(s).", MX_CALLOC_FAIL_WAIT_DEFAULT);
mx_sleep(MX_CALLOC_FAIL_WAIT_DEFAULT);
mx_log_debug("mx__mysql_stmt_init() failed: %m - retrying (forever) in %d second(s).", MX_MYSQL_FAIL_WAIT_DEFAULT);
mx_sleep(MX_MYSQL_FAIL_WAIT_DEFAULT);
} while (1);

*stmt = s;
Expand Down Expand Up @@ -1101,7 +1104,7 @@ int mx_mysql_statement_fetch(struct mx_mysql_stmt *stmt)
}

res = mx__mysql_stmt_fetch(stmt);
if (res == -ENOENT || res == 0)
if (res == -ENOENT)
return 0;

if (res < 0 && res != -ERANGE) {
Expand Down Expand Up @@ -1199,7 +1202,6 @@ int mx_mysql_bind_cleanup(struct mx_mysql_bind *bind)
{
mx_assert_return_minus_errno(bind, EINVAL);

mx_assert_return_minus_errno(bind->type != MX_MYSQL_BIND_TYPE_UNKNOWN, EBADF);

mx_free_null(bind->bind);
mx_free_null(bind->data);
Expand Down Expand Up @@ -1329,6 +1331,9 @@ struct mx_mysql_stmt *mx_mysql_statement_prepare_with_bindings(struct mx_mysql *
return stmt;
};

if (res < 0)
mx__mysql_stmt_log_warning(stmt);

mx_mysql_statement_close(&stmt);

return NULL;
Expand Down
135 changes: 34 additions & 101 deletions mxq_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

#include "mxq_group.h"
#include "mxq_job.h"
#include "mxq_mysql.h"
#include "mxq_util.h"
#include "mx_mysql.h"

#define GROUP_FIELDS_CNT 26
Expand Down Expand Up @@ -137,69 +137,6 @@ static int bind_result_group_fields(struct mx_mysql_bind *result, struct mxq_gro
return res;
}

static inline int mxq_group_bind_results(MYSQL_BIND *bind, struct mxq_group *g)
{
memset(bind, 0, sizeof(*bind)*MXQ_GROUP_COL__END);

MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_ID, &g->group_id);
MXQ_MYSQL_BIND_VARSTR(bind, MXQ_GROUP_COL_GROUP_NAME, &g->_group_name_length);
MXQ_MYSQL_BIND_UINT8(bind, MXQ_GROUP_COL_GROUP_STATUS, &g->group_status);
MXQ_MYSQL_BIND_UINT16(bind, MXQ_GROUP_COL_GROUP_PRIORITY, &g->group_priority);

MXQ_MYSQL_BIND_UINT32(bind, MXQ_GROUP_COL_USER_UID, &g->user_uid);
MXQ_MYSQL_BIND_VARSTR(bind, MXQ_GROUP_COL_USER_NAME, &g->_user_name_length);
MXQ_MYSQL_BIND_UINT32(bind, MXQ_GROUP_COL_USER_GID, &g->user_gid);
MXQ_MYSQL_BIND_VARSTR(bind, MXQ_GROUP_COL_USER_GROUP, &g->_user_group_length);

MXQ_MYSQL_BIND_VARSTR(bind, MXQ_GROUP_COL_JOB_COMMAND, &g->_job_command_length);

MXQ_MYSQL_BIND_UINT16(bind, MXQ_GROUP_COL_JOB_THREADS, &g->job_threads);
MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_JOB_MEMORY, &g->job_memory);
MXQ_MYSQL_BIND_UINT32(bind, MXQ_GROUP_COL_JOB_TIME, &g->job_time);

MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_JOBS, &g->group_jobs);
MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_JOBS_RUNNING, &g->group_jobs_running);
MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_JOBS_FINISHED, &g->group_jobs_finished);
MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_JOBS_FAILED, &g->group_jobs_failed);
MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_JOBS_CANCELLED, &g->group_jobs_cancelled);
MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_JOBS_UNKNOWN, &g->group_jobs_unknown);

MXQ_MYSQL_BIND_UINT64(bind, MXQ_GROUP_COL_GROUP_SLOTS_RUNNING, &g->group_slots_running);

MXQ_MYSQL_BIND_INT32(bind, MXQ_GROUP_COL_STATS_MAX_MAXRSS, &g->stats_max_maxrss);
MXQ_MYSQL_BIND_UINT32(bind, MXQ_GROUP_COL_STATS_MAX_UTIME_SEC, &g->stats_max_utime.tv_sec);
MXQ_MYSQL_BIND_UINT32(bind, MXQ_GROUP_COL_STATS_MAX_STIME_SEC, &g->stats_max_stime.tv_sec);
MXQ_MYSQL_BIND_UINT32(bind, MXQ_GROUP_COL_STATS_MAX_REAL_SEC, &g->stats_max_real.tv_sec);

return 1;
}

static int mxq_group_fetch_results(MYSQL_STMT *stmt, MYSQL_BIND *bind, struct mxq_group *g)
{
int res;

memset(g, 0, sizeof(*g));

res = mxq_mysql_stmt_fetch_row(stmt);
if (!res)
return 0;

res = mxq_mysql_stmt_fetch_string(stmt, bind, MXQ_GROUP_COL_GROUP_NAME, &(g->group_name), g->_group_name_length);
if (!res)
return 0;
res = mxq_mysql_stmt_fetch_string(stmt, bind, MXQ_GROUP_COL_USER_NAME, &(g->user_name), g->_user_name_length);
if (!res)
return 0;
res = mxq_mysql_stmt_fetch_string(stmt, bind, MXQ_GROUP_COL_USER_GROUP, &(g->user_group), g->_user_group_length);
if (!res)
return 0;
res = mxq_mysql_stmt_fetch_string(stmt, bind, MXQ_GROUP_COL_JOB_COMMAND, &(g->job_command), g->_job_command_length);
if (!res)
return 0;

return 1;
}

void mxq_group_free_content(struct mxq_group *g)
{
free_null(g->group_name);
Expand Down Expand Up @@ -394,6 +331,39 @@ int mxq_load_active_groups_for_user(struct mx_mysql *mysql, struct mxq_group **m
return res;
}

int mxq_load_active_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups)
{
int res;
struct mxq_group *groups = NULL;
struct mxq_group g = {0};
struct mx_mysql_bind result = {0};

assert(mysql);
assert(mxq_groups);

*mxq_groups = NULL;

char *query =
"SELECT"
GROUP_FIELDS
" FROM mxq_group"
" WHERE (group_jobs_inq > 0 OR group_jobs_running > 0)"
" ORDER BY user_name, group_mtime"
" LIMIT 1000";

res = bind_result_group_fields(&result, &g);
assert(res == 0);

res = mx_mysql_do_statement(mysql, query, NULL, &result, &g, (void **)&groups, sizeof(*groups));
if (res < 0) {
mx_log_err("mx_mysql_do_statement(): %m");
return res;
}

*mxq_groups = groups;
return res;
}

int mxq_load_running_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups)
{
int res;
Expand Down Expand Up @@ -465,40 +435,3 @@ int mxq_load_running_groups_for_user(struct mx_mysql *mysql, struct mxq_group **
return res;
}

int mxq_group_load_active_groups(MYSQL *mysql, struct mxq_group **mxq_group)
{
MYSQL_STMT *stmt;
MYSQL_BIND result[MXQ_GROUP_COL__END];
struct mxq_group g;
struct mxq_group *groups;
char *query;
int cnt;

*mxq_group = NULL;

mxq_group_bind_results(result, &g);

query = "SELECT " MXQ_GROUP_FIELDS
" FROM mxq_group"
" WHERE group_jobs-group_jobs_finished-group_jobs_failed-group_jobs_cancelled-group_jobs_unknown > 0"
" ORDER BY user_uid, group_priority DESC";

stmt = mxq_mysql_stmt_do_query(mysql, query, MXQ_GROUP_COL__END, NULL, result);
if (!stmt) {
mx_log_err("mxq_mysql_stmt_do_query(mysql=%p, stmt_str=\"%s\", field_count=%d, param=%p, result=%p)", mysql, query, MXQ_GROUP_COL__END, NULL, result);
return 0;
}

cnt = 0;
groups = NULL;
while (mxq_group_fetch_results(stmt, result, &g)) {
groups = realloc_forever(groups, sizeof(*groups)*(cnt+1));
memcpy(groups+cnt, &g, sizeof(*groups));
cnt++;
}

*mxq_group = groups;

mysql_stmt_close(stmt);
return cnt;
}
19 changes: 9 additions & 10 deletions mxq_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,18 @@ struct mxq_group {
#define MXQ_GROUP_STATUS_OK 0
#define MXQ_GROUP_STATUS_CANCELLED 99

void mxq_group_free_content(struct mxq_group *g);

inline uint64_t mxq_group_jobs_done(struct mxq_group *g);
inline uint64_t mxq_group_jobs_active(struct mxq_group *g);
inline uint64_t mxq_group_jobs_inq(struct mxq_group *g);

int mxq_load_group(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t group_id);
int mxq_load_all_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups);
int mxq_load_all_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid);
int mxq_load_active_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid);
int mxq_load_running_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups);
int mxq_load_running_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid);

int mxq_group_load_active_groups(MYSQL *mysql, struct mxq_group **mxq_group);
void mxq_group_free_content(struct mxq_group *g);
int mxq_group_update_status_cancelled(MYSQL *mysql, struct mxq_group *group);
int mxq_load_group(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t group_id);
int mxq_load_all_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups);
int mxq_load_active_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups);
int mxq_load_all_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid);
int mxq_load_active_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid);
int mxq_load_running_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups);
int mxq_load_running_groups_for_user(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t user_uid);

#endif
Loading

0 comments on commit 5fdf6bf

Please sign in to comment.