Skip to content

Commit

Permalink
Merge branches 'fix-mxqkill', 'set-reaper-pname', 'fix-some-log' and …
Browse files Browse the repository at this point in the history
…'cleanup'

* fix-mxqkill:
  mxqd: Remove superfluous "unknown pid returned" warning
  mxqd: Switch to killstate state machine
  mxqd: Set up ppidcache in monitor_jobs
  mxqd: Remove unneeded arguments from killall
  mxqd: Factor out monitor_jobs
  mxqd: Add `killstate` state machine
  ppidcache: Add new module
  mx_util: Add mx_clock_getboottime
  mx_util: Add _mx_cleanup_closedir_
  mx_proc: Add mx_proc_get_parent

* set-reaper-pname:
  mxqd: Give reaper thread fixed name

* fix-some-log:
  Avoid sporadic "No matching job found" warning
  mxq_control: Remove "Remove orphaned ..." messages
  mxqd: Fix "Main Loop freed N slots" message
  mxqd: Remove job startup chatter
  mxqd: Remove "received sigchld" log message

* cleanup:
  mxqd: Change severity of failing setsid to error
  mxqsub: Remove unused defines
  • Loading branch information
donald committed Aug 24, 2021
4 parents 28c431c + f7ba918 + 40258b9 + 907c92c commit 26d8ff8
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 34 deletions.
52 changes: 30 additions & 22 deletions mxqd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,8 @@ int user_process(struct mxq_group_list *glist, struct mxq_job *job)
return res;
}

static const char REAPER_PNAME[] = "mxqd reaper";

int reaper_process(struct mxq_server *server,struct mxq_group_list *glist, struct mxq_job *job) {
pid_t pid;
struct rusage rusage;
Expand All @@ -1202,9 +1204,15 @@ int reaper_process(struct mxq_server *server,struct mxq_group_list *glist, struc

group = &glist->group;

res = prctl(PR_SET_NAME, REAPER_PNAME, NULL, NULL, NULL);
if (res < 0) {
mx_log_err("reaper_process set name: %m");
return res;
}

res = setsid();
if (res < 0) {
mx_log_warning("reaper_process setsid: %m");
mx_log_err("reaper_process setsid: %m");
return res;
}

Expand Down Expand Up @@ -1326,8 +1334,6 @@ unsigned long start_job(struct mxq_group_list *glist)
if (!res) {
return 0;
}
mx_log_info(" job=%s(%d):%lu:%lu :: new job loaded.",
group->user_name, group->user_uid, group->group_id, job->job_id);

if (group->job_tmpdir_size > 0) {
mx_mysql_disconnect(server->mysql);
Expand Down Expand Up @@ -1362,8 +1368,6 @@ unsigned long start_job(struct mxq_group_list *glist)
mx_free_null(job->host_cpu_set_str);
job->host_cpu_set_str = mx_cpuset_to_str(&job->host_cpu_set);

mx_log_info("job assigned cpus: [%s]", job->host_cpu_set_str);

mx_mysql_disconnect(server->mysql);

pid = fork();
Expand All @@ -1375,14 +1379,6 @@ unsigned long start_job(struct mxq_group_list *glist)
} else if (pid == 0) {
job->host_pid = getpid();

mx_log_info(" job=%s(%d):%lu:%lu host_pid=%d pgrp=%d :: new child process forked.",
group->user_name,
group->user_uid,
group->group_id,
job->job_id,
job->host_pid,
getpgrp());

mx_log_debug("starting reaper process.");
mx_mysql_finish(&server->mysql);

Expand Down Expand Up @@ -1417,8 +1413,19 @@ unsigned long start_job(struct mxq_group_list *glist)
if (res < 0)
mx_log_err("start_job: failed to update daemon instance statistics: %m");

mx_log_info(" job=%s(%d):%lu:%lu :: added running job to watch queue.",
group->user_name, group->user_uid, group->group_id, job->job_id);
mx_log_info(" job=%s(%d):%lu:%lu :: started. pid=%d",
group->user_name, group->user_uid, group->group_id, job->job_id, pid);

/* The group counts in the database were updated by the sql triggers when
* we set the job from ASSIGNED to LOADED. We would pick that up in the
* next round of the main loop. Update the in-memory counts right now so
* that we don't try to start a new job when there are no INQ jobs left.
* This avoids a "No matching job found - maybe another server was a bit
* faster" warning when we started the last INQ jobs from a group.
*/

group->group_jobs_inq--;
group->group_jobs_running++;

return 1;
}
Expand Down Expand Up @@ -2117,6 +2124,7 @@ static int fspool_process_file(struct mxq_server *server,char *filename, uint64_
struct mxq_job_list *jlist;
struct mxq_job *job;
struct mxq_group *group;
int slots_returned = 0;

in=fopen(filename,"r");
if (!in) {
Expand Down Expand Up @@ -2183,12 +2191,12 @@ static int fspool_process_file(struct mxq_server *server,char *filename, uint64_
job->stats_status = status;
job->stats_rusage = rusage;

job_has_finished(server, group, jlist);
slots_returned = job_has_finished(server, group, jlist);
unlink(filename);
res = server_update_daemon_statistics(server);
if (res < 0)
mx_log_err("recover: failed to update daemon instance statistics: %m");
return(0);
return(slots_returned);
}

static int fspool_is_valid_name_parse(const char *name, unsigned long long int *job_id) {
Expand Down Expand Up @@ -2222,6 +2230,7 @@ static int fspool_scan(struct mxq_server *server) {
int res;
unsigned long long int job_id;
char *filename;
int slots_returned = 0;


entries=scandir(server->finished_jobsdir,&namelist,&fspool_is_valid_name,&alphasort);
Expand All @@ -2234,15 +2243,15 @@ static int fspool_scan(struct mxq_server *server) {
mx_asprintf_forever(&filename,"%s/%s",server->finished_jobsdir,namelist[i]->d_name);
fspool_is_valid_name_parse(namelist[i]->d_name,&job_id);
res=fspool_process_file(server,filename,job_id);
if (res==0) {
cnt++;
if (res>0) {
slots_returned += res;
}
free(namelist[i]);
free(filename);
}

free(namelist);
return cnt;
return slots_returned;
}

static int file_exists(char *name) {
Expand Down Expand Up @@ -2661,7 +2670,6 @@ static void process_signal(struct mxq_server *server,int sig,int extra)
}
break;
case SIGCHLD:
mx_log_info("received sigchld");
break;
default:
mx_log_warning("received signal %d (unexpected!)",sig);
Expand Down Expand Up @@ -2806,7 +2814,7 @@ int main(int argc, char *argv[])
slots_returned += fspool_scan(server);

if (slots_returned)
mx_log_info("slots_returned=%lu :: Main Loop freed %lu slots.", slots_returned, slots_returned);
mx_log_info("Main loop freed %lu slots.", slots_returned);

group_cnt = load_running_groups(server);
if (group_cnt)
Expand Down
7 changes: 0 additions & 7 deletions mxqd_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -530,11 +530,6 @@ int server_remove_orphaned_groups(struct mxq_server *server)
ulist->groups = gnext;
}

mx_log_info("group=%s(%d):%lu : Removing orphaned group.",
group->user_name,
group->user_uid,
group->group_id);

ulist->group_cnt--;
ulist->global_slots_running -= glist->global_slots_running;
ulist->global_threads_running -= glist->global_threads_running;
Expand All @@ -560,8 +555,6 @@ int server_remove_orphaned_groups(struct mxq_server *server)

server->user_cnt--;
mx_free_null(ulist);

mx_log_info("Removed orphaned user. %lu users left.", server->user_cnt);
}
return cnt;
}
Expand Down
5 changes: 0 additions & 5 deletions mxqsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@
#include "parser.tab.h"
#include "mxq.h"

#define MXQ_TASK_JOB_FORCE_APPEND (1<<0)
#define MXQ_TASK_JOB_FORCE_NEW (1<<1)

#define MXQ_JOB_STATUS_ACTIVE (1)

#define UINT64_UNSET (uint64_t)(-1)

static void print_usage(void)
Expand Down

0 comments on commit 26d8ff8

Please sign in to comment.