diff --git a/mxqd.c b/mxqd.c index 47268b31..e6b78f54 100644 --- a/mxqd.c +++ b/mxqd.c @@ -1185,6 +1185,8 @@ int user_process(struct mxq_group_list *glist, struct mxq_job *job) return res; } +static const char REAPER_PNAME[] = "mxqd reaper"; + int reaper_process(struct mxq_server *server,struct mxq_group_list *glist, struct mxq_job *job) { pid_t pid; struct rusage rusage; @@ -1202,9 +1204,15 @@ int reaper_process(struct mxq_server *server,struct mxq_group_list *glist, struc group = &glist->group; + res = prctl(PR_SET_NAME, REAPER_PNAME, NULL, NULL, NULL); + if (res < 0) { + mx_log_err("reaper_process set name: %m"); + return res; + } + res = setsid(); if (res < 0) { - mx_log_warning("reaper_process setsid: %m"); + mx_log_err("reaper_process setsid: %m"); return res; } @@ -1326,8 +1334,6 @@ unsigned long start_job(struct mxq_group_list *glist) if (!res) { return 0; } - mx_log_info(" job=%s(%d):%lu:%lu :: new job loaded.", - group->user_name, group->user_uid, group->group_id, job->job_id); if (group->job_tmpdir_size > 0) { mx_mysql_disconnect(server->mysql); @@ -1362,8 +1368,6 @@ unsigned long start_job(struct mxq_group_list *glist) mx_free_null(job->host_cpu_set_str); job->host_cpu_set_str = mx_cpuset_to_str(&job->host_cpu_set); - mx_log_info("job assigned cpus: [%s]", job->host_cpu_set_str); - mx_mysql_disconnect(server->mysql); pid = fork(); @@ -1375,14 +1379,6 @@ unsigned long start_job(struct mxq_group_list *glist) } else if (pid == 0) { job->host_pid = getpid(); - mx_log_info(" job=%s(%d):%lu:%lu host_pid=%d pgrp=%d :: new child process forked.", - group->user_name, - group->user_uid, - group->group_id, - job->job_id, - job->host_pid, - getpgrp()); - mx_log_debug("starting reaper process."); mx_mysql_finish(&server->mysql); @@ -1417,8 +1413,19 @@ unsigned long start_job(struct mxq_group_list *glist) if (res < 0) mx_log_err("start_job: failed to update daemon instance statistics: %m"); - mx_log_info(" job=%s(%d):%lu:%lu :: added running job to watch queue.", - group->user_name, group->user_uid, group->group_id, job->job_id); + mx_log_info(" job=%s(%d):%lu:%lu :: started. pid=%d", + group->user_name, group->user_uid, group->group_id, job->job_id, pid); + + /* The group counts in the database were updated by the sql triggers when + * we set the job from ASSIGNED to LOADED. We would pick that up in the + * next round of the main loop. Update the in-memory counts right now so + * that we don't try to start a new job when there are no INQ jobs left. + * This avoids a "No matching job found - maybe another server was a bit + * faster" warning when we started the last INQ jobs from a group. + */ + + group->group_jobs_inq--; + group->group_jobs_running++; return 1; } @@ -2117,6 +2124,7 @@ static int fspool_process_file(struct mxq_server *server,char *filename, uint64_ struct mxq_job_list *jlist; struct mxq_job *job; struct mxq_group *group; + int slots_returned = 0; in=fopen(filename,"r"); if (!in) { @@ -2183,12 +2191,12 @@ static int fspool_process_file(struct mxq_server *server,char *filename, uint64_ job->stats_status = status; job->stats_rusage = rusage; - job_has_finished(server, group, jlist); + slots_returned = job_has_finished(server, group, jlist); unlink(filename); res = server_update_daemon_statistics(server); if (res < 0) mx_log_err("recover: failed to update daemon instance statistics: %m"); - return(0); + return(slots_returned); } static int fspool_is_valid_name_parse(const char *name, unsigned long long int *job_id) { @@ -2222,6 +2230,7 @@ static int fspool_scan(struct mxq_server *server) { int res; unsigned long long int job_id; char *filename; + int slots_returned = 0; entries=scandir(server->finished_jobsdir,&namelist,&fspool_is_valid_name,&alphasort); @@ -2234,15 +2243,15 @@ static int fspool_scan(struct mxq_server *server) { mx_asprintf_forever(&filename,"%s/%s",server->finished_jobsdir,namelist[i]->d_name); fspool_is_valid_name_parse(namelist[i]->d_name,&job_id); res=fspool_process_file(server,filename,job_id); - if (res==0) { - cnt++; + if (res>0) { + slots_returned += res; } free(namelist[i]); free(filename); } free(namelist); - return cnt; + return slots_returned; } static int file_exists(char *name) { @@ -2661,7 +2670,6 @@ static void process_signal(struct mxq_server *server,int sig,int extra) } break; case SIGCHLD: - mx_log_info("received sigchld"); break; default: mx_log_warning("received signal %d (unexpected!)",sig); @@ -2806,7 +2814,7 @@ int main(int argc, char *argv[]) slots_returned += fspool_scan(server); if (slots_returned) - mx_log_info("slots_returned=%lu :: Main Loop freed %lu slots.", slots_returned, slots_returned); + mx_log_info("Main loop freed %lu slots.", slots_returned); group_cnt = load_running_groups(server); if (group_cnt) diff --git a/mxqd_control.c b/mxqd_control.c index 138963c8..a06029f9 100644 --- a/mxqd_control.c +++ b/mxqd_control.c @@ -530,11 +530,6 @@ int server_remove_orphaned_groups(struct mxq_server *server) ulist->groups = gnext; } - mx_log_info("group=%s(%d):%lu : Removing orphaned group.", - group->user_name, - group->user_uid, - group->group_id); - ulist->group_cnt--; ulist->global_slots_running -= glist->global_slots_running; ulist->global_threads_running -= glist->global_threads_running; @@ -560,8 +555,6 @@ int server_remove_orphaned_groups(struct mxq_server *server) server->user_cnt--; mx_free_null(ulist); - - mx_log_info("Removed orphaned user. %lu users left.", server->user_cnt); } return cnt; } diff --git a/mxqsub.c b/mxqsub.c index c1c8997e..595a98c7 100644 --- a/mxqsub.c +++ b/mxqsub.c @@ -36,11 +36,6 @@ #include "parser.tab.h" #include "mxq.h" -#define MXQ_TASK_JOB_FORCE_APPEND (1<<0) -#define MXQ_TASK_JOB_FORCE_NEW (1<<1) - -#define MXQ_JOB_STATUS_ACTIVE (1) - #define UINT64_UNSET (uint64_t)(-1) static void print_usage(void)