Skip to content

Commit

Permalink
Merge branch 'fixes'
Browse files Browse the repository at this point in the history
* fixes:
  mxqdctl-hostconfig: add 'quit' and usage
  fix inline declarations for C99 (gcc 5)
  mxqd: update group_jobs_running on transition from RUNING or LOADED to UNKNOWN
  mxqd_control: insert new groups at the end of the users group list
  mysql/trigger: remove trigger for data we now maintain by mxqd
  web: limit group table display to seven days
  web: make meaning of values more clear in daemon table header
  web: remove useless columns from server table
  mxqd: update statistics in daemon table when jobs are started and stopped
  mxqd: update daemon statistics after reload
  mxq_job: store host_slots when we set status to ASSIGNED
  • Loading branch information
donald committed May 24, 2016
2 parents 21f7744 + 4fe98d4 commit adb45dc
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 75 deletions.
12 changes: 6 additions & 6 deletions mx_mysql.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ static inline int mx__mysql_errno(struct mx_mysql *mysql)
return (int)error;
}

inline const char *mx__mysql_error(struct mx_mysql *mysql)
extern inline const char *mx__mysql_error(struct mx_mysql *mysql)
{
mx_assert_return_NULL(mysql, EINVAL);
mx_assert_return_NULL(mysql->mysql, EBADF);
Expand All @@ -71,7 +71,7 @@ inline const char *mx__mysql_error(struct mx_mysql *mysql)
return mysql_error(mysql->mysql);
}

inline const char *mx__mysql_sqlstate(struct mx_mysql *mysql)
extern inline const char *mx__mysql_sqlstate(struct mx_mysql *mysql)
{
mx_assert_return_NULL(mysql, EINVAL);
mx_assert_return_NULL(mysql->mysql, EBADF);
Expand Down Expand Up @@ -688,7 +688,7 @@ static int mx__mysql_library_end(void) {

/**********************************************************************/

static inline int _mx_mysql_bind_integer(struct mx_mysql_bind *b, unsigned int index, void *value, int type, int is_unsigned)
extern inline int _mx_mysql_bind_integer(struct mx_mysql_bind *b, unsigned int index, void *value, int type, int is_unsigned)
{
mx_assert_return_minus_errno(b, EINVAL);
mx_assert_return_minus_errno(value, EINVAL);
Expand Down Expand Up @@ -1230,7 +1230,7 @@ int mx_mysql_statement_field_count(struct mx_mysql_stmt *stmt)
return mx__mysql_stmt_field_count(stmt);
}

inline int mx_mysql_stmt_field_count_set(struct mx_mysql_stmt *stmt)
extern inline int mx_mysql_stmt_field_count_set(struct mx_mysql_stmt *stmt)
{
mx_assert_return_minus_errno(stmt, EINVAL);
mx_assert_return_minus_errno(stmt->stmt, EBADF);
Expand All @@ -1251,7 +1251,7 @@ inline int mx_mysql_stmt_field_count_get(struct mx_mysql_stmt *stmt, unsigned lo
return 0;
}

inline int mx_mysql_stmt_param_count_set(struct mx_mysql_stmt *stmt)
extern inline int mx_mysql_stmt_param_count_set(struct mx_mysql_stmt *stmt)
{
mx_assert_return_minus_errno(stmt, EINVAL);
mx_assert_return_minus_errno(stmt->stmt, EBADF);
Expand Down Expand Up @@ -1504,7 +1504,7 @@ int mx_mysql_statement_close_no_bind_cleanup(struct mx_mysql_stmt **stmt)
return 0;
}

inline int mx_mysql_bind_integer(struct mx_mysql_bind *b, unsigned int index, void *value, int type, int is_unsigned)
extern inline int mx_mysql_bind_integer(struct mx_mysql_bind *b, unsigned int index, void *value, int type, int is_unsigned)
{
int res;

Expand Down
41 changes: 41 additions & 0 deletions mxq_daemon.c
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,47 @@ int mxq_daemon_set_status(struct mx_mysql *mysql, struct mxq_daemon *daemon, uin
return res;
}

int mxq_daemon_update_statistics(struct mx_mysql *mysql, struct mxq_daemon *daemon)
{
assert(daemon);
assert(daemon->daemon_id);

struct mx_mysql_bind param = {0};
char *query;
int idx;
int res;

query = "UPDATE"
" mxq_daemon"
" SET"
" mtime = NULL,"
" daemon_jobs_running = ?,"
" daemon_slots_running = ?,"
" daemon_threads_running = ?,"
" daemon_memory_used = ?"
" WHERE daemon_id = ?";

res = mx_mysql_bind_init_param(&param, 5);
assert(res == 0);

idx = 0;
res = 0;
res += mx_mysql_bind_var(&param, idx++, uint32, &(daemon->daemon_jobs_running));
res += mx_mysql_bind_var(&param, idx++, uint32, &(daemon->daemon_slots_running));
res += mx_mysql_bind_var(&param, idx++, uint32, &(daemon->daemon_threads_running));
res += mx_mysql_bind_var(&param, idx++, uint64, &(daemon->daemon_memory_used));
res += mx_mysql_bind_var(&param, idx++, uint32, &(daemon->daemon_id));
assert(res == 0);

res = mx_mysql_do_statement_noresult_retry_on_fail(mysql, query, &param);
if (res < 0) {
mx_log_err("mx_mysql_do_statement(): %m");
return res;
}

return res;
}

int mxq_load_all_daemons(struct mx_mysql *mysql, struct mxq_daemon **daemons)
{
struct mxq_daemon *daemons_tmp = NULL;
Expand Down
1 change: 1 addition & 0 deletions mxq_daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct mxq_daemon {
void mxq_daemon_free_content(struct mxq_daemon *daemon);
int mxq_daemon_register(struct mx_mysql *mysql, struct mxq_daemon *daemon);
int mxq_daemon_mark_crashed(struct mx_mysql *mysql, struct mxq_daemon *daemon);
int mxq_daemon_update_statistics(struct mx_mysql *mysql, struct mxq_daemon *daemon);
int mxq_daemon_shutdown(struct mx_mysql *mysql, struct mxq_daemon *daemon);
int mxq_daemon_set_status(struct mx_mysql *mysql, struct mxq_daemon *daemon, uint8_t status);
#endif
6 changes: 3 additions & 3 deletions mxq_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void mxq_group_free_content(struct mxq_group *g)
}


inline uint64_t mxq_group_jobs_done(struct mxq_group *g)
extern inline uint64_t mxq_group_jobs_done(struct mxq_group *g)
{
uint64_t done = 0;

Expand All @@ -122,7 +122,7 @@ inline uint64_t mxq_group_jobs_done(struct mxq_group *g)
return done;
}

inline uint64_t mxq_group_jobs_active(struct mxq_group *g)
extern inline uint64_t mxq_group_jobs_active(struct mxq_group *g)
{
uint64_t active;

Expand All @@ -136,7 +136,7 @@ inline uint64_t mxq_group_jobs_active(struct mxq_group *g)
return active;
}

inline uint64_t mxq_group_jobs_inq(struct mxq_group *g)
extern inline uint64_t mxq_group_jobs_inq(struct mxq_group *g)
{
uint64_t inq;

Expand Down
6 changes: 3 additions & 3 deletions mxq_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ struct mxq_group {

void mxq_group_free_content(struct mxq_group *g);

inline uint64_t mxq_group_jobs_done(struct mxq_group *g);
inline uint64_t mxq_group_jobs_active(struct mxq_group *g);
inline uint64_t mxq_group_jobs_inq(struct mxq_group *g);
uint64_t mxq_group_jobs_done(struct mxq_group *g);
uint64_t mxq_group_jobs_active(struct mxq_group *g);
uint64_t mxq_group_jobs_inq(struct mxq_group *g);

int mxq_load_group(struct mx_mysql *mysql, struct mxq_group **mxq_groups, uint64_t group_id);
int mxq_load_all_groups(struct mx_mysql *mysql, struct mxq_group **mxq_groups);
Expand Down
11 changes: 7 additions & 4 deletions mxq_job.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **
return res;
}

int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon)
int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job)
{
struct mx_mysql_bind param = {0};
int res;
Expand All @@ -311,6 +311,7 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i
" SET"
" daemon_id = ?,"
" host_hostname = ?,"
" host_slots = ?, "
" server_id = ?,"
" job_status = " status_str(MXQ_JOB_STATUS_ASSIGNED)
" WHERE group_id = ?"
Expand All @@ -324,13 +325,14 @@ int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_i
" job_id"
" LIMIT 1";

res = mx_mysql_bind_init_param(&param, 4);
res = mx_mysql_bind_init_param(&param, 5);
assert(res == 0);

idx = 0;
res = 0;
res += mx_mysql_bind_var(&param, idx++, uint32, &daemon->daemon_id);
res += mx_mysql_bind_var(&param, idx++, string, &daemon->hostname);
res += mx_mysql_bind_var(&param, idx++, uint64, &slots_per_job);
res += mx_mysql_bind_var(&param, idx++, string, &daemon->daemon_name);
res += mx_mysql_bind_var(&param, idx++, uint64, &group_id);
assert(res == 0);
Expand Down Expand Up @@ -678,7 +680,8 @@ int mxq_load_job_from_group_assigned_to_daemon(struct mx_mysql *mysql, struct mx
return res;
}

int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *job, uint64_t group_id, struct mxq_daemon *daemon)
int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *job, uint64_t group_id, struct mxq_daemon *daemon,
unsigned long slots_per_job)
{
int res;
struct mxq_job *jobs_tmp = NULL;
Expand All @@ -700,7 +703,7 @@ int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *j
break;
}

res = mxq_assign_job_from_group_to_daemon(mysql, group_id, daemon);
res = mxq_assign_job_from_group_to_daemon(mysql, group_id, daemon, slots_per_job);
if (res < 0) {
mx_log_err(" group_id=%lu :: mxq_assign_job_from_group_to_daemon(): %m", group_id);
return 0;
Expand Down
4 changes: 2 additions & 2 deletions mxq_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ void mxq_job_free_content(struct mxq_job *j);
int mxq_load_job(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, uint64_t job_id);
int mxq_load_jobs_in_group(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp);
int mxq_load_jobs_in_group_with_status(struct mx_mysql *mysql, struct mxq_job **mxq_jobs, struct mxq_group *grp, uint64_t job_status);
int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon);
int mxq_assign_job_from_group_to_daemon(struct mx_mysql *mysql, uint64_t group_id, struct mxq_daemon *daemon, unsigned long slots_per_job);
int mxq_unassign_jobs_of_server(struct mx_mysql *mysql, struct mxq_daemon *daemon);
int mxq_set_job_status_loaded_on_server(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_running(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_exited(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_set_job_status_unknown(struct mx_mysql *mysql, struct mxq_job *job);
int mxq_job_set_tmpfilenames(struct mxq_group *g, struct mxq_job *j);
int mxq_load_job_from_group_assigned_to_daemon(struct mx_mysql *mysql, struct mxq_job **jobs_result, uint64_t group_id, struct mxq_daemon *daemon);
int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, struct mxq_daemon *daemon);
int mxq_load_job_from_group_for_daemon(struct mx_mysql *mysql, struct mxq_job *mxqjob, uint64_t group_id, struct mxq_daemon *daemon,unsigned long slots_per_job);
int mxq_load_jobs_running_on_server(struct mx_mysql *mysql, struct mxq_job **jobs_result, struct mxq_daemon *daemon);
#endif
38 changes: 35 additions & 3 deletions mxqd.c
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,23 @@ int write_pid_to_file(char *fname)
return 0;
}

int server_update_daemon_statistics(struct mxq_server *server)
{
struct mxq_daemon *daemon;

assert(server);
assert(server->mysql);

daemon=&server->daemon;

daemon->daemon_jobs_running = server->jobs_running;
daemon->daemon_threads_running = server->threads_running;
daemon->daemon_memory_used = server->memory_used;
daemon->daemon_slots_running = server->slots_running;

return mxq_daemon_update_statistics(server->mysql,daemon);
}

static int cpuset_init(struct mxq_server *server)
{
int res;
Expand Down Expand Up @@ -1131,7 +1148,7 @@ unsigned long start_job(struct mxq_group_list *glist)
group = &glist->group;
job = &_mxqjob;

res = mxq_load_job_from_group_for_daemon(server->mysql, job, group->group_id, daemon);
res = mxq_load_job_from_group_for_daemon(server->mysql, job, group->group_id, daemon, glist->slots_per_job);
if (!res) {
return 0;
}
Expand Down Expand Up @@ -1191,6 +1208,10 @@ unsigned long start_job(struct mxq_group_list *glist)
jlist = group_list_add_job(glist, job);
assert(jlist);

res = server_update_daemon_statistics(server);
if (res < 0)
mx_log_err("start_job: failed to update daemon instance statistics: %m");

mx_log_info(" job=%s(%d):%lu:%lu :: added running job to watch queue.",
group->user_name, group->user_uid, group->group_id, job->job_id);

Expand Down Expand Up @@ -1719,6 +1740,7 @@ static int job_is_lost(struct mxq_server *server,struct mxq_group *group, struct

mxq_set_job_status_unknown(server->mysql, job);
group->group_jobs_unknown++;
group->group_jobs_running--;

rename_outfiles(group, job);

Expand Down Expand Up @@ -1814,6 +1836,9 @@ static int fspool_process_file(struct mxq_server *server,char *filename, uint64_

job_has_finished(server, group, jlist);
unlink(filename);
res = server_update_daemon_statistics(server);
if (res < 0)
mx_log_err("recover: failed to update daemon instance statistics: %m");
return(0);
}

Expand Down Expand Up @@ -1955,6 +1980,9 @@ static int lost_scan(struct mxq_server *server)
return res;
count+=res;
} while (res>0);
res = server_update_daemon_statistics(server);
if (res < 0)
mx_log_err("lost_scan: failed to update daemon instance statistics: %m");
return count;
}

Expand Down Expand Up @@ -2120,7 +2148,7 @@ int load_running_groups(struct mxq_server *server)
grp_cnt = mxq_load_running_groups_for_user(server->mysql, &grps, getuid());

for (i=0, total=0; i < grp_cnt; i++) {
group = &grps[grp_cnt-i-1];
group = &grps[i];

passwd = getpwnam(group->user_name);
if (!passwd) {
Expand Down Expand Up @@ -2198,7 +2226,11 @@ int recover_from_previous_crash(struct mxq_server *server)
if (res>0)
mx_log_warning("recover: %d jobs vanished from the system",res);

return 0;
res = server_update_daemon_statistics(server);
if (res < 0)
mx_log_err("recover: failed to update daemon instance statistics: %m");

return res;
}

/**********************************************************************/
Expand Down
20 changes: 18 additions & 2 deletions mxqd_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,23 @@ struct mxq_job_list *group_list_add_job(struct mxq_group_list *glist, struct mxq
return jlist;
}

/*
* given a mxq_user_list element, find the tail of its groups list.
* returns the address of the pointer containing NULL
*/
static struct mxq_group_list **group_list_tail_ptr(struct mxq_user_list *ulist)
{
struct mxq_group_list **tail_ptr=&ulist->groups;
while (*tail_ptr) {
tail_ptr=&(*tail_ptr)->next;
}
return tail_ptr;
}

/*
* create a new mxq_group_list element from a mxq_group and add it to the users groups
* update user and server counters
*/
struct mxq_group_list *_user_list_add_group(struct mxq_user_list *ulist, struct mxq_group *group)
{
struct mxq_group_list *glist;
Expand All @@ -332,8 +349,7 @@ struct mxq_group_list *_user_list_add_group(struct mxq_user_list *ulist, struct

glist->user = ulist;

glist->next = ulist->groups;
ulist->groups = glist;
*group_list_tail_ptr(ulist)=glist;

ulist->group_cnt++;
server->group_cnt++;
Expand Down
26 changes: 25 additions & 1 deletion mxqdctl-hostconfig.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ function stop_all_started()
done
}

function quit_all_started()
{
for pidfile in ${pidfilebase}* ; do
ouid=$(stat --format "%u" "${pidfile}")
if [ "${UID}" != "${ouid}" ] ; then
continue
fi
pid=$(cat ${pidfile})
echo "${pidfile}: sending signal SIGQUIT to ${pid}"
kill -QUIT ${pid}
done
}

function reload_all_started()
{
for pidfile in ${pidfilebase}* ; do
Expand Down Expand Up @@ -93,11 +106,22 @@ case "${BASH_ARGV[0]}" in
kill)
kill_all_started
;;
quit)
quit_all_started
;;
reload|restart)
reload_all_started
;;
stopall)
killall -u "${USER}" "${mxqd}"
;;
*)
echo "usage $0 CMD"
echo " start : start mxqd (if configured by hostconfig)"
echo " stop : tell mxqd to stop accepting new jobs, wait for running jobs, exit"
echo " kill : tell mxqd to stop accepting new jobs, kill and wait for running jobs, exit"
echo " quit : tell mxqd to exit (leave jobs running)"
echo " reload|restart : tell mxqd to re-exec itself, leave jobs running"
echo " stopall : as 'stop', but to any mxqd owned by calling user"
;;
esac

Loading

0 comments on commit adb45dc

Please sign in to comment.