Skip to content

Commit

Permalink
Merge branch 'improvements'
Browse files Browse the repository at this point in the history
* improvements:
  mxqd: Cleanup server_init()
  mxqd: Cleanup main()
  • Loading branch information
mariux committed Nov 4, 2015
2 parents a502546 + 61f76e1 commit 79db558
Showing 1 changed file with 70 additions and 53 deletions.
123 changes: 70 additions & 53 deletions mxqd.c
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[])

while ((opt=mx_getopt(&optctl, &i)) != MX_GETOPT_END) {
if (opt == MX_GETOPT_ERROR) {
exit(EX_USAGE);
return -EX_USAGE;
}

switch (opt) {
Expand Down Expand Up @@ -397,16 +397,16 @@ int server_init(struct mxq_server *server, int argc, char *argv[])

case 'V':
mxq_print_generic_version();
exit(EX_USAGE);
return -EX_USAGE;

case 'h':
print_usage();
exit(EX_USAGE);
return -EX_USAGE;

case 'j':
if (mx_strtoul(optctl.optarg, &arg_threads_total) < 0) {
mx_log_err("Invalid argument supplied for option --slots '%s': %m", optctl.optarg);
exit(1);
return -EX_USAGE;
}
break;

Expand All @@ -416,7 +416,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[])

if(mx_strtobytes(optctl.optarg, &bytes) < 0) {
mx_log_err("Invalid argument supplied for option --memory '%s': %m", optctl.optarg);
exit(1);
return -EX_USAGE;
}
arg_memory_total = bytes/1024/1024;
}
Expand All @@ -430,7 +430,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[])

if(mx_strtobytes(optctl.optarg, &bytes) < 0) {
mx_log_err("Invalid argument supplied for option --max-memory-per-slot '%s': %m", optctl.optarg);
exit(1);
return -EX_USAGE;
}
arg_memory_max = bytes/1024/1024;
}
Expand Down Expand Up @@ -462,7 +462,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[])

if (arg_daemonize && arg_nolog) {
mx_log_err("Error while using conflicting options --daemonize and --no-log at once.");
exit(EX_USAGE);
return -EX_USAGE;
}

memset(server, 0, sizeof(*server));
Expand All @@ -475,34 +475,35 @@ int server_init(struct mxq_server *server, int argc, char *argv[])

server->flock = mx_flock(LOCK_EX, "/dev/shm/mxqd.%s.%s.lck", server->hostname, server->server_id);
if (!server->flock) {
return -1;
mx_log_err("mx_flock(/dev/shm/mxqd.%s.%s.lck) failed: %m", server->hostname, server->server_id);
return -EX_UNAVAILABLE;
}

if (!server->flock->locked) {
mx_log_err("MXQ Server '%s' on host '%s' is already running. Exiting.", server->server_id, server->hostname);
exit(2);
return -EX_UNAVAILABLE;
}

mx_asprintf_forever(&server->finished_jobsdir,"%s/%s",MXQ_FINISHED_JOBSDIR,server->server_id);
res=mx_mkdir_p(server->finished_jobsdir,0700);
if (res<0) {
mx_log_err("MAIN: mkdir %s failed: %m. Exiting.",MXQ_FINISHED_JOBSDIR);
exit(EX_IOERR);
return -EX_IOERR;
}

if (arg_daemonize) {
res = daemon(0, 1);
if (res == -1) {
mx_log_err("MAIN: daemon(0, 1) failed: %m. Exiting.");
exit(EX_UNAVAILABLE);
return -EX_OSERR;
}
}

if (arg_pidfile) {
res = write_pid_to_file(arg_pidfile);
if (res < 0) {
mx_log_err("MAIN: pidfile (%s) setup failed: %m. Exiting.", arg_pidfile);
exit(EX_IOERR);
return -EX_IOERR;
}

server->pidfilename = arg_pidfile;
Expand All @@ -511,7 +512,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
res = prctl(PR_SET_CHILD_SUBREAPER, 1);
if (res == -1) {
mx_log_err("MAIN: prctl(PR_SET_CHILD_SUBREAPER) setup failed: %m. Exiting.");
exit(EX_IOERR);
return -EX_OSERR;
}

setup_stdin("/dev/null");
Expand All @@ -524,14 +525,14 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
if (!RUNNING_AS_ROOT)
mx_log_warning("Running mxqd as non-root user.");
mx_log_err("MAIN: can't write to '%s': %m", arg_logdir);
exit(EX_IOERR);
return -EX_IOERR;
}
res = setup_cronolog("/usr/sbin/cronolog", arg_logdir, "mxqd_log", "%Y/mxqd_log-%Y-%m");
if (!res) {
if (!RUNNING_AS_ROOT)
mx_log_warning("Running mxqd as non-root user.");
mx_log_err("MAIN: cronolog setup failed. exiting.");
exit(EX_IOERR);
return -EX_IOERR;
}
}

Expand Down Expand Up @@ -564,7 +565,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
res = cpuset_init(server);
if (res < 0) {
mx_log_err("MAIN: cpuset_init() failed. exiting.");
exit(1);
return -EX_OSERR;
}
server->memory_total = arg_memory_total;
server->memory_max_per_slot = arg_memory_max;
Expand All @@ -581,7 +582,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
if (server->memory_max_per_slot > server->memory_total)
server->memory_max_per_slot = server->memory_total;

return 1;
return 0;
}

static void reset_signals()
Expand Down Expand Up @@ -1398,6 +1399,8 @@ void server_free(struct mxq_server *server)
mx_free_null(server->host_id);
mx_free_null(server->finished_jobsdir);
mx_flock_free(server->flock);

mx_log_finish();
}

void server_close(struct mxq_server *server)
Expand Down Expand Up @@ -2177,7 +2180,8 @@ int main(int argc, char *argv[])
{
int group_cnt;

struct mxq_server server;
struct mxq_server __server;
struct mxq_server *server = &__server;

unsigned long slots_started = 0;
unsigned long slots_returned = 0;
Expand All @@ -2189,10 +2193,10 @@ int main(int argc, char *argv[])

mx_log_level_set(MX_LOG_INFO);

res = server_init(&server, argc, argv);
res = server_init(server, argc, argv);
if (res < 0) {
mx_log_err("MXQ Server: Can't initialize server handle. Exiting.");
exit(1);
server_close(server);
exit(-res);
}

mx_log_info("mxqd - " MXQ_VERSIONFULL);
Expand All @@ -2202,15 +2206,20 @@ int main(int argc, char *argv[])
#ifdef MXQ_DEVELOPMENT
mx_log_warning("DEVELOPMENT VERSION: Do not use in production environments.");
#endif
mx_log_info("hostname=%s server_id=%s :: MXQ server started.", server.hostname, server.server_id);
mx_log_info(" host_id=%s", server.host_id);
mx_log_info("hostname=%s server_id=%s :: MXQ server started.",
server->hostname,
server->server_id);
mx_log_info(" host_id=%s", server->host_id);
mx_log_info("slots=%lu memory_total=%lu memory_avg_per_slot=%.0Lf memory_max_per_slot=%ld :: server initialized.",
server.slots, server.memory_total, server.memory_avg_per_slot, server.memory_max_per_slot);
cpuset_log("cpu set available",&server.cpu_set_available);
server->slots,
server->memory_total,
server->memory_avg_per_slot,
server->memory_max_per_slot);
cpuset_log("cpu set available", &(server->cpu_set_available));

/*** database connect ***/

mx_mysql_connect_forever(&(server.mysql));
mx_mysql_connect_forever(&(server->mysql));

/*** main loop ***/

Expand All @@ -2223,55 +2232,55 @@ int main(int argc, char *argv[])
signal(SIGTTOU, SIG_IGN);
signal(SIGCHLD, no_handler);

res = recover_from_previous_crash(&server);
res = recover_from_previous_crash(server);
if (res < 0) {
mx_log_warning("recover_from_previous_crash() failed. Aborting execution.");
fail = 1;
}

if (server.recoveronly)
if (server->recoveronly)
fail = 1;

while (!global_sigint_cnt && !global_sigterm_cnt && !global_sigquit_cnt && !fail) {
slots_returned = catchall(&server);
slots_returned += fspool_scan(&server);
slots_returned = catchall(server);
slots_returned += fspool_scan(server);

if (slots_returned)
mx_log_info("slots_returned=%lu :: Main Loop freed %lu slots.", slots_returned, slots_returned);

if (slots_started || slots_returned) {
server_dump(&server);
server_dump(server);
slots_started = 0;
}

group_cnt = load_running_groups(&server);
group_cnt = load_running_groups(server);
if (group_cnt)
mx_log_debug("group_cnt=%d :: %d Groups loaded", group_cnt, group_cnt);

killall_cancelled(&server);
killall_over_time(&server);
killall_over_memory(&server);
killall_cancelled(server);
killall_over_time(server);
killall_over_memory(server);

if (!server.group_cnt) {
assert(!server.jobs_running);
if (!server->group_cnt) {
assert(!server->jobs_running);
assert(!group_cnt);
mx_log_info("Nothing to do. Sleeping for a short while. (1 second)");
sleep(1);
continue;
}

if (server.slots_running == server.slots) {
if (server->slots_running == server->slots) {
mx_log_info("All slots running. Sleeping for a short while (7 seconds).");
sleep(7);
continue;
}

slots_started = start_users(&server);
slots_started = start_users(server);
if (slots_started)
mx_log_info("slots_started=%lu :: Main Loop started %lu slots.", slots_started, slots_started);

if (!slots_started && !slots_returned && !global_sigint_cnt && !global_sigterm_cnt) {
if (!server.jobs_running) {
if (!server->jobs_running) {
mx_log_info("Tried Hard and nobody is doing anything. Sleeping for a long while (15 seconds).");
sleep(15);
} else {
Expand All @@ -2283,37 +2292,45 @@ int main(int argc, char *argv[])
}
/*** clean up ***/

mx_log_info("global_sigint_cnt=%d global_sigterm_cnt=%d global_sigquit_cnt=%d: Exiting.", global_sigint_cnt, global_sigterm_cnt,global_sigquit_cnt);
mx_log_info("global_sigint_cnt=%d global_sigterm_cnt=%d global_sigquit_cnt=%d: Exiting.",
global_sigint_cnt,
global_sigterm_cnt,
global_sigquit_cnt);

if (global_sigterm_cnt||global_sigint_cnt) {
while (server.jobs_running) {
slots_returned = catchall(&server);
slots_returned += fspool_scan(&server);
while (server->jobs_running) {
slots_returned = catchall(server);
slots_returned += fspool_scan(server);

if (slots_returned) {
mx_log_info("jobs_running=%lu slots_returned=%lu global_sigint_cnt=%d global_sigterm_cnt=%d :",
server.jobs_running, slots_returned, global_sigint_cnt, global_sigterm_cnt);
server->jobs_running,
slots_returned,
global_sigint_cnt,
global_sigterm_cnt);
continue;
}
if (global_sigint_cnt)
killall(&server, SIGTERM, 1);
killall(server, SIGTERM, 1);

killall_cancelled(&server);
killall_over_time(&server);
killall_over_memory(&server);
killall_cancelled(server);
killall_over_time(server);
killall_over_memory(server);
mx_log_info("jobs_running=%lu global_sigint_cnt=%d global_sigterm_cnt=%d : Exiting. Wating for jobs to finish. Sleeping for a while.",
server.jobs_running, global_sigint_cnt, global_sigterm_cnt);
server->jobs_running,
global_sigint_cnt,
global_sigterm_cnt);
sleep(1);
}
}

mx_mysql_finish(&(server.mysql));
mx_mysql_finish(&(server->mysql));

server_close(&server);
server_close(server);

mx_log_info("cu, mx.");

mx_log_finish();

return 0;
exit(0);
}

0 comments on commit 79db558

Please sign in to comment.