diff --git a/Makefile b/Makefile index a4b16467..dad0d7d2 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ MXQ_VERSION_MAJOR = 0 -MXQ_VERSION_MINOR = 16 -MXQ_VERSION_PATCH = 1 +MXQ_VERSION_MINOR = 17 +MXQ_VERSION_PATCH = 0 MXQ_VERSION_EXTRA = "beta" MXQ_VERSIONDATE = 2013-2015 diff --git a/README.md b/README.md index 2c895b3c..fb37fe79 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,15 @@ # mxq MXQ - mariux64 job scheduling system +## Sources +### Main git repository + +https://github.molgen.mpg.de/mariux64/mxq + +### github.com clone + +https://github.com/mariux/mxq + ## Installation ### Install using `GNU make` ``` diff --git a/mx_util.c b/mx_util.c index 790fc6b7..24088b2c 100644 --- a/mx_util.c +++ b/mx_util.c @@ -9,6 +9,8 @@ #include #include +#include + //#include //#include #include @@ -553,6 +555,18 @@ int mx_strtoi64(char *str, int64_t *to) return 0; } +void *mx_malloc_forever(size_t size) +{ + void *ret; + + do { + ret = malloc(size); + assert(ret || (!ret && errno == ENOMEM)); + } while (!ret); + + return ret ; +} + char *mx_strdup_forever(char *str) { char *dup; @@ -1160,3 +1174,149 @@ char **mx_strvec_from_str(char *str) return strvec; } + +int mx_str_to_cpuset(cpu_set_t* cpuset_ptr, char *str) +{ + char c; + int cpu_low; + int cpu_high; + char *next; + int i; + + CPU_ZERO(cpuset_ptr); + + while (1) { + c = *str; + + if (c == '\0') + break; + + if (!isdigit(c)) + return -(errno=EINVAL); + + cpu_low = strtol(str, &next, 10); + str = next; + + if (cpu_low < 0 || cpu_low >= CPU_SETSIZE) + return -(errno=ERANGE); + + c = *str; + + CPU_SET(cpu_low, cpuset_ptr); + + if (c == '\0') { + break; + } else if (c == ',') { + str++; + continue; + } else if (c != '-') { + return -(errno=EINVAL); + } + + str++; + c = *str; + + if (!isdigit(c)) + return -(errno=EINVAL); + + cpu_high = strtol(str, &next, 10); + str = next; + + if (cpu_high < 0 || cpu_high >= CPU_SETSIZE || cpu_high < cpu_low) + return -(errno=ERANGE); + + for (i = cpu_low+1; i <= cpu_high; i++) + CPU_SET(i, cpuset_ptr); + + c = *str; + + if (c == '\0') { + break; + } else if (c != ',') { + return -(errno=EINVAL); + } + + str++; + } + return 0; +} + +char *mx_strvec_join(char *sep,char **strvec) +{ + int elements=0; + int len=0; + char *out; + char *in; + char *p; + int i; + + assert(sep); + assert(strvec); + + for (i=0;(in=strvec[i]);i++) { + elements++; + len += strlen(in); + } + + if (elements == 0) + return mx_strdup_forever(""); + + len += strlen(sep)*(elements-1); + out = mx_malloc_forever(len+1); + p = out; + + for (i=0;i=CPU_SETSIZE) + break; + + if (CPU_ISSET(cpu,cpuset_ptr)) { + cpu_low=cpu; + while (1) { + cpu++; + if (cpu>=CPU_SETSIZE || !CPU_ISSET(cpu,cpuset_ptr)) + break; + } + cpu_high=cpu-1; + if (cpu_low==cpu_high) { + mx_asprintf_forever(&str,"%d",cpu_low); + } else { + mx_asprintf_forever(&str,"%d-%d",cpu_low,cpu_high); + } + res=mx_strvec_push_str(&strvec,str); + if (!res) { + mx_strvec_free(strvec); + return NULL; + } + } else { + cpu++; + } + } + + out=mx_strvec_join(",",strvec); + mx_strvec_free(strvec); + return out; +} diff --git a/mx_util.h b/mx_util.h index f79d77fa..f2adb2ff 100644 --- a/mx_util.h +++ b/mx_util.h @@ -6,6 +6,9 @@ #include #include #include +#include +#include +#include #include "mx_log.h" @@ -117,6 +120,18 @@ static inline void __mx_fclose(FILE **ptr) { #undef mx_streq_nocase #define mx_streq_nocase(a, b) (strcasecmp((a), (b)) == 0) +#define mx_within_rate_limit_or_return(sec, ret) \ + do {\ + static struct timeval _sleep = {0};\ + struct timeval _now;\ + struct timeval _delta;\ + gettimeofday(&_now, NULL);\ + timersub(&_now, &_sleep, &_delta);\ + if (_delta.tv_sec < (sec))\ + return (ret);\ + _sleep = _now;\ + } while(0) + int mx_strbeginswith(char *str, const char *start, char **endptr); int mx_stribeginswith(char *str, const char *start, char **endptr); int mx_strbeginswithany(char *str, char **starts, char **endptr); @@ -146,6 +161,7 @@ int mx_strtoi16(char *str, int16_t *to); int mx_strtoi32(char *str, int32_t *to); int mx_strtoi64(char *str, int64_t *to); +void *mx_malloc_forever(size_t size); char *mx_strdup_forever(char *str); int mx_vasprintf_forever(char **strp, const char *fmt, va_list ap); int mx_asprintf_forever(char **strp, const char *fmt, ...) __attribute__ ((format(printf, 2, 3))); @@ -189,5 +205,9 @@ int mx_strvec_push_strvec(char*** strvecp, char **strvec); char* mx_strvec_to_str(char **strvec); char** mx_strvec_from_str(char *str); void mx_strvec_free(char **strvec); +char* mx_strvec_join(char *sep,char **strvec); + +char* mx_cpuset_to_str(cpu_set_t* cpuset_ptr); +int mx_str_to_cpuset(cpu_set_t* cpuset_ptr,char *str); #endif diff --git a/mxq_job.h b/mxq_job.h index 9e7ba35c..42b2d81d 100644 --- a/mxq_job.h +++ b/mxq_job.h @@ -7,6 +7,8 @@ #include #include +#include + #include "mxq_group.h" struct mxq_job { @@ -42,6 +44,7 @@ struct mxq_job { uint32_t host_pid; uint32_t host_slots; + cpu_set_t host_cpu_set; int64_t date_submit; int64_t date_start; diff --git a/mxqd.c b/mxqd.c index 6b8a206a..874533bf 100644 --- a/mxqd.c +++ b/mxqd.c @@ -9,6 +9,7 @@ #include #include +#include #include @@ -61,7 +62,7 @@ static void print_usage(void) " %s [options]\n" "\n" "options:\n" - " -j, --slots default: 1\n" + " -j, --slots default: depends on number of cores\n" " -m, --memory default: 2G\n" " -x, --max-memory-per-slot default: /\n" "\n" @@ -72,6 +73,7 @@ static void print_usage(void) " --daemonize default: run in foreground\n" " --no-log default: write a logfile\n" " --debug default: info log level\n" + " --recover-only (recover from crash and exit)\n" "\n" " --initial-path default: %s\n" " --initial-tmpdir default: %s\n" @@ -96,6 +98,36 @@ static void print_usage(void) ); } +static void cpuset_log(char *prefix,cpu_set_t *cpuset) +{ + char *str; + str=mx_cpuset_to_str(cpuset); + mx_log_info("%s: [%s]",prefix,str); + free(str); +} + +static void cpuset_init_job(cpu_set_t *job_cpu_set,cpu_set_t *available,cpu_set_t *running,int slots) +{ + int cpu; + CPU_ZERO(job_cpu_set); + for (cpu=CPU_SETSIZE-1;slots&&cpu>=0;cpu--) { + if (CPU_ISSET(cpu,available) && !CPU_ISSET(cpu,running)) { + CPU_SET(cpu,job_cpu_set); + CPU_SET(cpu,running); + slots--; + } + } + } + +static void cpuset_clear_running(cpu_set_t *running,cpu_set_t *job) { + int cpu; + for (cpu=0;cpuslots; + + res=sched_getaffinity(0,sizeof(server->cpu_set_available),&server->cpu_set_available); + if (res<0) { + mx_log_err("sched_getaffinity: (%m)"); + return(-errno); + } + available_cnt=CPU_COUNT(&server->cpu_set_available); + if (slots) { + if (slots>available_cnt) { + mx_log_err("%d slots requested, but only %d cores available",slots,available_cnt); + return(-(errno=EINVAL)); + } + } else { + if (available_cnt>=16) { + slots=available_cnt-2; + } else if (available_cnt>=4) { + slots=available_cnt-1; + } else { + slots=available_cnt; + } + } + + for (cpu=0;cpuslots;cpu++) { + if (CPU_ISSET(cpu,&server->cpu_set_available)) { + CPU_CLR(cpu,&server->cpu_set_available); + available_cnt--; + } + } + server->slots=slots; + return(0); +} int server_init(struct mxq_server *server, int argc, char *argv[]) { @@ -203,9 +274,10 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) char *arg_initial_tmpdir; char arg_daemonize = 0; char arg_nolog = 0; + char arg_recoveronly = 0; char *str_bootid; int opt; - unsigned long threads_total = 1; + unsigned long threads_total = 0; unsigned long memory_total = 2048; unsigned long memory_max = 0; int i; @@ -218,6 +290,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) MX_OPTION_NO_ARG("daemonize", 1), MX_OPTION_NO_ARG("no-log", 3), MX_OPTION_NO_ARG("debug", 5), + MX_OPTION_NO_ARG("recover-only", 9), MX_OPTION_REQUIRED_ARG("pid-file", 2), MX_OPTION_REQUIRED_ARG("initial-path", 7), MX_OPTION_REQUIRED_ARG("initial-tmpdir", 8), @@ -275,6 +348,10 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) arg_hostname = optctl.optarg; break; + case 9: + arg_recoveronly = 1; + break; + case 'V': mxq_print_generic_version(); exit(EX_USAGE); @@ -288,8 +365,6 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) mx_log_err("Invalid argument supplied for option --slots '%s': %m", optctl.optarg); exit(1); } - if (!threads_total) - threads_total = 1; break; case 'm': @@ -360,6 +435,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) server->server_id = arg_server_id; server->initial_path = arg_initial_path; server->initial_tmpdir = arg_initial_tmpdir; + server->recoveronly = arg_recoveronly; server->flock = mx_flock(LOCK_EX, "/dev/shm/mxqd.%s.%s.lck", server->hostname, server->server_id); if (!server->flock) { @@ -432,7 +508,12 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) mx_asprintf_forever(&server->host_id, "%s-%llx-%x", server->boot_id, server->starttime, getpid()); - server->slots = threads_total;; + server->slots = threads_total; + res = cpuset_init(server); + if (res < 0) { + mx_log_err("MAIN: cpuset_init() failed. exiting."); + exit(1); + } server->memory_total = memory_total; server->memory_max_per_slot = memory_max; server->memory_avg_per_slot = (long double)server->memory_total / (long double)server->slots; @@ -919,6 +1000,9 @@ static int init_child_process(struct mxq_group_list *group, struct mxq_job *j) umask(j->job_umask); + res=sched_setaffinity(0,sizeof(j->host_cpu_set),&j->host_cpu_set); + if (res<0) mx_log_warning("sched_setaffinity: $m"); + return 1; } @@ -1037,6 +1121,9 @@ unsigned long start_job(struct mxq_group_list *group) mx_log_info(" job=%s(%d):%lu:%lu :: new job loaded.", group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id); + cpuset_init_job(&mxqjob.host_cpu_set,&server->cpu_set_available,&server->cpu_set_running,group->slots_per_job); + cpuset_log(" job assgined cpus: ",&mxqjob.host_cpu_set); + mx_mysql_disconnect(server->mysql); pid = fork(); @@ -1349,6 +1436,7 @@ void server_dump(struct mxq_server *server) mx_log_info("memory_used=%lu memory_total=%lu", server->memory_used, server->memory_total); mx_log_info("slots_running=%lu slots=%lu threads_running=%lu jobs_running=%lu", server->slots_running, server->slots, server->threads_running, server->jobs_running); + cpuset_log("cpu set running",&server->cpu_set_running); mx_log_info("====================== SERVER DUMP END ======================"); } @@ -1421,6 +1509,11 @@ int killall_over_time(struct mxq_server *server) assert(server); + /* limit killing to every >= 5 minutes */ + mx_within_rate_limit_or_return(5*60, 1); + + mx_log_info("killall_over_time: Sending signals to all jobs running longer than requested."); + gettimeofday(&now, NULL); for (user=server->users; user; user=user->next) { @@ -1618,6 +1711,7 @@ int catchall(struct mxq_server *server) { } cnt += job->group->slots_per_job; + cpuset_clear_running(&server->cpu_set_running,&j->host_cpu_set); mxq_job_free_content(j); free(job); } @@ -1728,6 +1822,7 @@ int main(int argc, char *argv[]) mx_log_info(" host_id=%s", server.host_id); mx_log_info("slots=%lu memory_total=%lu memory_avg_per_slot=%.0Lf memory_max_per_slot=%ld :: server initialized.", server.slots, server.memory_total, server.memory_avg_per_slot, server.memory_max_per_slot); + cpuset_log("cpu set available",&server.cpu_set_available); /*** database connect ***/ @@ -1752,6 +1847,9 @@ int main(int argc, char *argv[]) if (res > 0) mx_log_warning("total %d jobs recovered from previous crash.", res); + if (server.recoveronly) + fail = 1; + while (!global_sigint_cnt && !global_sigterm_cnt && !fail) { slots_returned = catchall(&server); if (slots_returned) @@ -1816,7 +1914,6 @@ int main(int argc, char *argv[]) killallcancelled(&server, SIGTERM, 0); killallcancelled(&server, SIGINT, 0); killall_over_time(&server); - killall_over_time(&server); mx_log_info("jobs_running=%lu global_sigint_cnt=%d global_sigterm_cnt=%d : Exiting. Wating for jobs to finish. Sleeping for a while.", server.jobs_running, global_sigint_cnt, global_sigterm_cnt); diff --git a/mxqd.h b/mxqd.h index 8de7d89c..4de72d44 100644 --- a/mxqd.h +++ b/mxqd.h @@ -2,6 +2,7 @@ #define __MXQ_SERVER_H__ 1 #include "mx_mysql.h" +#include struct mxq_job_list { struct mxq_group_list *group; @@ -62,11 +63,13 @@ struct mxq_server { unsigned long threads_running; unsigned long slots_running; unsigned long memory_used; + cpu_set_t cpu_set_running; unsigned long slots; unsigned long memory_total; long double memory_avg_per_slot; unsigned long memory_max_per_slot; + cpu_set_t cpu_set_available; struct mx_mysql *mysql; @@ -80,6 +83,7 @@ struct mxq_server { char *initial_path; char *initial_tmpdir; + char recoveronly; }; diff --git a/test_mx_util.c b/test_mx_util.c index b4ef240c..b290490a 100644 --- a/test_mx_util.c +++ b/test_mx_util.c @@ -358,14 +358,61 @@ static void test_mx_strscan(void) static void test_mx_strvec() { char **strvec; + char *str; strvec=mx_strvec_new(); mx_strvec_push_str(&strvec,strdup("Hallo")); mx_strvec_push_str(&strvec,strdup("Bla")); mx_strvec_push_str(&strvec,strdup("lall")); + + assert(str=mx_strvec_join("XXX",strvec)); + assert(strcmp(str,"HalloXXXBlaXXXlall")==0); + free(str); + assert(str=mx_strvec_join("",strvec)); + assert(strcmp(str,"HalloBlalall")==0); + free(str); + mx_strvec_free(strvec); + + strvec=mx_strvec_new(); + assert(str=mx_strvec_join("XXX",strvec)); + assert(strcmp(str,"")==0); + free(str); + mx_strvec_push_str(&strvec,strdup("A")); + assert(str=mx_strvec_join("x",strvec)); + assert(strcmp(str,"A")==0); + free(str); + mx_strvec_push_str(&strvec,strdup("")); + assert(str=mx_strvec_join("x",strvec)); + assert(strcmp(str,"Ax")==0); + free(str); + mx_strvec_push_str(&strvec,strdup("B")); + assert(str=mx_strvec_join("x",strvec)); + assert(strcmp(str,"AxxB")==0); + free(str); mx_strvec_free(strvec); } +static void test_mx_cpuset(void) +{ + cpu_set_t cpuset; + char *str; + + assert(mx_str_to_cpuset(&cpuset,"1,2,3,10,11,12,100-102")==0); + assert((str=mx_cpuset_to_str(&cpuset))); + assert(strcmp(str,"1-3,10-12,100-102")==0); + free(str); + + assert(mx_str_to_cpuset(&cpuset,"")==0); + assert((str=mx_cpuset_to_str(&cpuset))); + assert(strcmp(str,"")==0); + free(str); + + assert(mx_str_to_cpuset(&cpuset,"bla")<0); + assert(mx_str_to_cpuset(&cpuset,"5-4")<0); + assert(mx_str_to_cpuset(&cpuset,"-4")<0); + assert(mx_str_to_cpuset(&cpuset,"4-")<0); +} + int main(int argc, char *argv[]) { test_mx_strskipwhitespaces(); @@ -382,5 +429,6 @@ int main(int argc, char *argv[]) test_mx_read_first_line_from_file(); test_mx_strscan(); test_mx_strvec(); + test_mx_cpuset(); return 0; }