From 03703e1d478c5f040f53ffe6b6e850754d575ce5 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 14 Oct 2015 18:32:20 +0200 Subject: [PATCH 1/8] mx_util: add mx_malloc_forever --- mx_util.c | 10 ++++++++++ mx_util.h | 1 + 2 files changed, 11 insertions(+) diff --git a/mx_util.c b/mx_util.c index 790fc6b..14d294c 100644 --- a/mx_util.c +++ b/mx_util.c @@ -553,6 +553,16 @@ int mx_strtoi64(char *str, int64_t *to) return 0; } +void *mx_malloc_forever(size_t size) +{ + void *ret; + do { + ret=malloc(size); + assert(ret || (!ret && errno == ENOMEM)); + } while (!ret); + return ret ; +} + char *mx_strdup_forever(char *str) { char *dup; diff --git a/mx_util.h b/mx_util.h index f79d77f..f8585fe 100644 --- a/mx_util.h +++ b/mx_util.h @@ -146,6 +146,7 @@ int mx_strtoi16(char *str, int16_t *to); int mx_strtoi32(char *str, int32_t *to); int mx_strtoi64(char *str, int64_t *to); +void *mx_malloc_forever(size_t size); char *mx_strdup_forever(char *str); int mx_vasprintf_forever(char **strp, const char *fmt, va_list ap); int mx_asprintf_forever(char **strp, const char *fmt, ...) __attribute__ ((format(printf, 2, 3))); From 11dacb96ec2207cfc19017e658920b5a0bd43901 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Fri, 16 Oct 2015 15:08:25 +0200 Subject: [PATCH 2/8] mx_util: Add mx_strvec_join() --- mx_util.c | 34 ++++++++++++++++++++++++++++++++++ mx_util.h | 1 + test_mx_util.c | 26 ++++++++++++++++++++++++++ 3 files changed, 61 insertions(+) diff --git a/mx_util.c b/mx_util.c index 14d294c..e79e467 100644 --- a/mx_util.c +++ b/mx_util.c @@ -1170,3 +1170,37 @@ char **mx_strvec_from_str(char *str) return strvec; } + +char *mx_strvec_join(char *sep,char **strvec) +{ + int elements=0; + int len=0; + char *out; + char *in; + char *p; + int i; + + for (i=0;(in=strvec[i]);i++) { + elements++; + len+=strlen(in); + } + if (elements==0) return mx_strdup_forever(""); + len+=strlen(sep)*(elements-1); + out=mx_malloc_forever(len+1); + p=out; + + for (i=0;i Date: Fri, 16 Oct 2015 15:02:39 +0200 Subject: [PATCH 3/8] mx_util: Add functions to scan and format cpusets --- mx_util.c | 103 +++++++++++++++++++++++++++++++++++++++++++++++++ mx_util.h | 4 ++ test_mx_util.c | 22 +++++++++++ 3 files changed, 129 insertions(+) diff --git a/mx_util.c b/mx_util.c index e79e467..abd53a4 100644 --- a/mx_util.c +++ b/mx_util.c @@ -1171,6 +1171,66 @@ char **mx_strvec_from_str(char *str) return strvec; } +int mx_str_to_cpuset(cpu_set_t* cpuset_ptr,char *str) +{ + char c; + int cpu_low; + int cpu_high; + char *next; + int i; + + CPU_ZERO(cpuset_ptr); + + while (1) { + c=*str; + if (c=='\0') { + break; + } else if (c>='0' && c<='9') { + cpu_low=strtol(str,&next,10); + str=next; + } else { + return -(errno=EINVAL); + } + + if (cpu_low<0 || cpu_low>=CPU_SETSIZE) { + return -(errno=EINVAL); + } + + c=*str; + if (c=='\0') { + CPU_SET(cpu_low,cpuset_ptr); + break; + } else if (c==',') { + CPU_SET(cpu_low,cpuset_ptr); + str++; + } else if (c=='-') { + c=*++str; + if (c>='0' && c<='9') { + cpu_high=strtol(str,&next,10); + str=next; + if (cpu_high<0 || cpu_high>=CPU_SETSIZE || cpu_high=CPU_SETSIZE) break; + if (CPU_ISSET(cpu,cpuset_ptr)) { + cpu_low=cpu++; + while (1) { + if (cpu>=CPU_SETSIZE || !CPU_ISSET(cpu,cpuset_ptr)) break; + cpu++; + } + cpu_high=cpu-1; + if (cpu_low==cpu_high) { + mx_asprintf_forever(&str,"%d",cpu_low); + } else { + mx_asprintf_forever(&str,"%d-%d",cpu_low,cpu_high); + } + res=mx_strvec_push_str(&strvec,str); + if (!res) { + mx_strvec_free(strvec); + return NULL; + } + } else { + cpu++; + } + } + + out=mx_strvec_join(",",strvec); + mx_strvec_free(strvec); + return out; +} diff --git a/mx_util.h b/mx_util.h index 9ab0ee5..aeb2489 100644 --- a/mx_util.h +++ b/mx_util.h @@ -6,6 +6,7 @@ #include #include #include +#include #include "mx_log.h" @@ -192,4 +193,7 @@ char** mx_strvec_from_str(char *str); void mx_strvec_free(char **strvec); char* mx_strvec_join(char *sep,char **strvec); +char* mx_cpuset_to_str(cpu_set_t* cpuset_ptr); +int mx_str_to_cpuset(cpu_set_t* cpuset_ptr,char *str); + #endif diff --git a/test_mx_util.c b/test_mx_util.c index f89135c..b290490 100644 --- a/test_mx_util.c +++ b/test_mx_util.c @@ -392,6 +392,27 @@ static void test_mx_strvec() { mx_strvec_free(strvec); } +static void test_mx_cpuset(void) +{ + cpu_set_t cpuset; + char *str; + + assert(mx_str_to_cpuset(&cpuset,"1,2,3,10,11,12,100-102")==0); + assert((str=mx_cpuset_to_str(&cpuset))); + assert(strcmp(str,"1-3,10-12,100-102")==0); + free(str); + + assert(mx_str_to_cpuset(&cpuset,"")==0); + assert((str=mx_cpuset_to_str(&cpuset))); + assert(strcmp(str,"")==0); + free(str); + + assert(mx_str_to_cpuset(&cpuset,"bla")<0); + assert(mx_str_to_cpuset(&cpuset,"5-4")<0); + assert(mx_str_to_cpuset(&cpuset,"-4")<0); + assert(mx_str_to_cpuset(&cpuset,"4-")<0); +} + int main(int argc, char *argv[]) { test_mx_strskipwhitespaces(); @@ -408,5 +429,6 @@ int main(int argc, char *argv[]) test_mx_read_first_line_from_file(); test_mx_strscan(); test_mx_strvec(); + test_mx_cpuset(); return 0; } From 6883f57fa205f936fd0617b90879268ad15507d3 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 14 Oct 2015 12:09:27 +0200 Subject: [PATCH 4/8] mxqd: find and store cpu affinitity --- mxqd.c | 68 ++++++++++++++++++++++++++++++++++++++++++++++++++++------ mxqd.h | 3 +++ 2 files changed, 65 insertions(+), 6 deletions(-) diff --git a/mxqd.c b/mxqd.c index 47881c4..4fe825d 100644 --- a/mxqd.c +++ b/mxqd.c @@ -9,6 +9,7 @@ #include #include +#include #include @@ -61,7 +62,7 @@ static void print_usage(void) " %s [options]\n" "\n" "options:\n" - " -j, --slots default: 1\n" + " -j, --slots default: depends on number of cores\n" " -m, --memory default: 2G\n" " -x, --max-memory-per-slot default: /\n" "\n" @@ -190,6 +191,45 @@ int write_pid_to_file(char *fname) return 0; } +static int cpuset_init(struct mxq_server *server) +{ + int res; + int available_cnt; + int cpu; + int slots; + + slots=server->slots; + + res=sched_getaffinity(0,sizeof(server->cpu_set_available),&server->cpu_set_available); + if (res<0) { + mx_log_err("sched_getaffinity: (%m)"); + return(-errno); + } + available_cnt=CPU_COUNT(&server->cpu_set_available); + if (slots) { + if (slots>available_cnt) { + mx_log_err("%d slots requested, but only %d cores available",slots,available_cnt); + return(-(errno=EINVAL)); + } + } else { + if (available_cnt>=16) { + slots=available_cnt-2; + } else if (available_cnt>=4) { + slots=available_cnt-1; + } else { + slots=available_cnt; + } + } + + for (cpu=0;cpuslots;cpu++) { + if (CPU_ISSET(cpu,&server->cpu_set_available)) { + CPU_CLR(cpu,&server->cpu_set_available); + available_cnt--; + } + } + server->slots=slots; + return(0); +} int server_init(struct mxq_server *server, int argc, char *argv[]) { @@ -205,7 +245,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) char arg_nolog = 0; char *str_bootid; int opt; - unsigned long threads_total = 1; + unsigned long threads_total = 0; unsigned long memory_total = 2048; unsigned long memory_max = 0; int i; @@ -288,8 +328,6 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) mx_log_err("Invalid argument supplied for option --slots '%s': %m", optctl.optarg); exit(1); } - if (!threads_total) - threads_total = 1; break; case 'm': @@ -428,7 +466,12 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) mx_asprintf_forever(&server->host_id, "%s-%llx-%x", server->boot_id, server->starttime, getpid()); - server->slots = threads_total;; + server->slots = threads_total; + res = cpuset_init(server); + if (res < 0) { + mx_log_err("MAIN: cpuset_init() failed. exiting."); + exit(1); + } server->memory_total = memory_total; server->memory_max_per_slot = memory_max; server->memory_avg_per_slot = (long double)server->memory_total / (long double)server->slots; @@ -1673,6 +1716,18 @@ int recover_from_previous_crash(struct mxq_server *server) return res1+res2; } +static void log_server_cpusets(struct mxq_server *server) +{ + char *available; + char *running; + + available=mx_cpuset_to_str(&server->cpu_set_available); + running=mx_cpuset_to_str(&server->cpu_set_running); + mx_log_info(" server cpuset available: [%s] running: [%s]",available,running); + free (available); + free (running); +} + /**********************************************************************/ static void no_handler(int sig) {} @@ -1718,7 +1773,8 @@ int main(int argc, char *argv[]) mx_log_info(" host_id=%s", server.host_id); mx_log_info("slots=%lu memory_total=%lu memory_avg_per_slot=%.0Lf memory_max_per_slot=%ld :: server initialized.", server.slots, server.memory_total, server.memory_avg_per_slot, server.memory_max_per_slot); - + log_server_cpusets(&server); + /*** database connect ***/ mx_mysql_connect_forever(&(server.mysql)); diff --git a/mxqd.h b/mxqd.h index 8de7d89..eb34b36 100644 --- a/mxqd.h +++ b/mxqd.h @@ -2,6 +2,7 @@ #define __MXQ_SERVER_H__ 1 #include "mx_mysql.h" +#include struct mxq_job_list { struct mxq_group_list *group; @@ -62,11 +63,13 @@ struct mxq_server { unsigned long threads_running; unsigned long slots_running; unsigned long memory_used; + cpu_set_t cpu_set_running; unsigned long slots; unsigned long memory_total; long double memory_avg_per_slot; unsigned long memory_max_per_slot; + cpu_set_t cpu_set_available; struct mx_mysql *mysql; From 64488ba4125f0df940788d9f052307e7e9d9f9be Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 14 Oct 2015 14:24:10 +0200 Subject: [PATCH 5/8] mxqd: add and use helper function for cpuset logging --- mxqd.c | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/mxqd.c b/mxqd.c index 4fe825d..fe97d14 100644 --- a/mxqd.c +++ b/mxqd.c @@ -97,6 +97,14 @@ static void print_usage(void) ); } +static void cpuset_log(char *prefix,cpu_set_t *cpuset) +{ + char *str; + str=mx_cpuset_to_str(cpuset); + mx_log_info("%s: [%s]",prefix,str); + free(str); +} + /**********************************************************************/ int setup_cronolog(char *cronolog, char *link, char *format) { @@ -1716,18 +1724,6 @@ int recover_from_previous_crash(struct mxq_server *server) return res1+res2; } -static void log_server_cpusets(struct mxq_server *server) -{ - char *available; - char *running; - - available=mx_cpuset_to_str(&server->cpu_set_available); - running=mx_cpuset_to_str(&server->cpu_set_running); - mx_log_info(" server cpuset available: [%s] running: [%s]",available,running); - free (available); - free (running); -} - /**********************************************************************/ static void no_handler(int sig) {} @@ -1773,8 +1769,8 @@ int main(int argc, char *argv[]) mx_log_info(" host_id=%s", server.host_id); mx_log_info("slots=%lu memory_total=%lu memory_avg_per_slot=%.0Lf memory_max_per_slot=%ld :: server initialized.", server.slots, server.memory_total, server.memory_avg_per_slot, server.memory_max_per_slot); - log_server_cpusets(&server); - + cpuset_log("cpu set available",&server.cpu_set_available); + /*** database connect ***/ mx_mysql_connect_forever(&(server.mysql)); From 06021ab6665980d3fada9dbb4ba65813722ff03e Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 14 Oct 2015 14:25:05 +0200 Subject: [PATCH 6/8] mxq_job: expand struct to hold cpuset assigned to job --- mxq_job.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mxq_job.h b/mxq_job.h index 9e7ba35..42b2d81 100644 --- a/mxq_job.h +++ b/mxq_job.h @@ -7,6 +7,8 @@ #include #include +#include + #include "mxq_group.h" struct mxq_job { @@ -42,6 +44,7 @@ struct mxq_job { uint32_t host_pid; uint32_t host_slots; + cpu_set_t host_cpu_set; int64_t date_submit; int64_t date_start; From 346a953b71fb80150e45a8abb4a0f7b39eaea890 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 14 Oct 2015 14:25:58 +0200 Subject: [PATCH 7/8] mxqd: assign cpusets to jobs an keep book of running cpus --- mxqd.c | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/mxqd.c b/mxqd.c index fe97d14..2235d62 100644 --- a/mxqd.c +++ b/mxqd.c @@ -105,6 +105,28 @@ static void cpuset_log(char *prefix,cpu_set_t *cpuset) free(str); } +static void cpuset_init_job(cpu_set_t *job_cpu_set,cpu_set_t *available,cpu_set_t *running,int slots) +{ + int cpu; + CPU_ZERO(job_cpu_set); + for (cpu=CPU_SETSIZE-1;slots&&cpu>=0;cpu--) { + if (CPU_ISSET(cpu,available) && !CPU_ISSET(cpu,running)) { + CPU_SET(cpu,job_cpu_set); + CPU_SET(cpu,running); + slots--; + } + } + } + +static void cpuset_clear_running(cpu_set_t *running,cpu_set_t *job) { + int cpu; + for (cpu=0;cpugroup.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id); + cpuset_init_job(&mxqjob.host_cpu_set,&server->cpu_set_available,&server->cpu_set_running,group->slots_per_job); + cpuset_log(" job assgined cpus: ",&mxqjob.host_cpu_set); + mx_mysql_disconnect(server->mysql); pid = fork(); @@ -1393,6 +1418,7 @@ void server_dump(struct mxq_server *server) mx_log_info("memory_used=%lu memory_total=%lu", server->memory_used, server->memory_total); mx_log_info("slots_running=%lu slots=%lu threads_running=%lu jobs_running=%lu", server->slots_running, server->slots, server->threads_running, server->jobs_running); + cpuset_log("cpu set running",&server->cpu_set_running); mx_log_info("====================== SERVER DUMP END ======================"); } @@ -1662,6 +1688,7 @@ int catchall(struct mxq_server *server) { } cnt += job->group->slots_per_job; + cpuset_clear_running(&server->cpu_set_running,&j->host_cpu_set); mxq_job_free_content(j); free(job); } From bc92e9dca3ce604ac152baf4bcad202e77f01c45 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 14 Oct 2015 14:48:22 +0200 Subject: [PATCH 8/8] mxqd: restrict job to assigned cpuset --- mxqd.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mxqd.c b/mxqd.c index 2235d62..9bbb798 100644 --- a/mxqd.c +++ b/mxqd.c @@ -985,6 +985,9 @@ static int init_child_process(struct mxq_group_list *group, struct mxq_job *j) umask(j->job_umask); + res=sched_setaffinity(0,sizeof(j->host_cpu_set),&j->host_cpu_set); + if (res<0) mx_log_warning("sched_setaffinity: $m"); + return 1; }