Skip to content

Commit

Permalink
Merge branch 'donald/rewrite/affinity'
Browse files Browse the repository at this point in the history
* donald/rewrite/affinity:
  mxqd: restrict job to assigned cpuset
  mxqd: assign cpusets to jobs an keep book of running cpus
  mxq_job: expand struct to hold cpuset assigned to job
  mxqd: add and use helper function for cpuset logging
  mxqd: find and store cpu affinitity
  mx_util: Add functions to scan and format cpusets
  mx_util: Add mx_strvec_join()
  mx_util: add mx_malloc_forever
  • Loading branch information
mariux committed Oct 16, 2015
2 parents 6273426 + bc92e9d commit 96df4dc
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 5 deletions.
147 changes: 147 additions & 0 deletions mx_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,16 @@ int mx_strtoi64(char *str, int64_t *to)
return 0;
}

void *mx_malloc_forever(size_t size)
{
void *ret;
do {
ret=malloc(size);
assert(ret || (!ret && errno == ENOMEM));
} while (!ret);
return ret ;
}

char *mx_strdup_forever(char *str)
{
char *dup;
Expand Down Expand Up @@ -1160,3 +1170,140 @@ char **mx_strvec_from_str(char *str)

return strvec;
}

int mx_str_to_cpuset(cpu_set_t* cpuset_ptr,char *str)
{
char c;
int cpu_low;
int cpu_high;
char *next;
int i;

CPU_ZERO(cpuset_ptr);

while (1) {
c=*str;
if (c=='\0') {
break;
} else if (c>='0' && c<='9') {
cpu_low=strtol(str,&next,10);
str=next;
} else {
return -(errno=EINVAL);
}

if (cpu_low<0 || cpu_low>=CPU_SETSIZE) {
return -(errno=EINVAL);
}

c=*str;
if (c=='\0') {
CPU_SET(cpu_low,cpuset_ptr);
break;
} else if (c==',') {
CPU_SET(cpu_low,cpuset_ptr);
str++;
} else if (c=='-') {
c=*++str;
if (c>='0' && c<='9') {
cpu_high=strtol(str,&next,10);
str=next;
if (cpu_high<0 || cpu_high>=CPU_SETSIZE || cpu_high<cpu_low) {
return -(errno=EINVAL);
}
for (i=cpu_low;i<=cpu_high;i++) {
CPU_SET(i,cpuset_ptr);
} c=*str++;
if (c=='\0') {
break;
} else if (c==',') {
/* noop */
} else {
return -(errno=EINVAL);
}
} else {
return -(errno=EINVAL);
}
} else {
return -(errno=EINVAL);
}
}
return 0;
}

char *mx_strvec_join(char *sep,char **strvec)
{
int elements=0;
int len=0;
char *out;
char *in;
char *p;
int i;

for (i=0;(in=strvec[i]);i++) {
elements++;
len+=strlen(in);
}
if (elements==0) return mx_strdup_forever("");
len+=strlen(sep)*(elements-1);
out=mx_malloc_forever(len+1);
p=out;

for (i=0;i<elements-1;i++) {
in=strvec[i];
while (*in)
*p++ = *in++;
in=sep;
while (*in)
*p++ = *in++;
}
in=strvec[i];
while (*in)
*p++ = *in++;
in=sep;
*p='\0';
return(out);
}

char *mx_cpuset_to_str(cpu_set_t* cpuset_ptr)
{
char **strvec;
int cpu;
int cpu_low;
int cpu_high;
char *str;
int res;
char *out;

strvec=mx_strvec_new();
if (!strvec) return NULL;

cpu=0;
while(1) {
if (cpu>=CPU_SETSIZE) break;
if (CPU_ISSET(cpu,cpuset_ptr)) {
cpu_low=cpu++;
while (1) {
if (cpu>=CPU_SETSIZE || !CPU_ISSET(cpu,cpuset_ptr)) break;
cpu++;
}
cpu_high=cpu-1;
if (cpu_low==cpu_high) {
mx_asprintf_forever(&str,"%d",cpu_low);
} else {
mx_asprintf_forever(&str,"%d-%d",cpu_low,cpu_high);
}
res=mx_strvec_push_str(&strvec,str);
if (!res) {
mx_strvec_free(strvec);
return NULL;
}
} else {
cpu++;
}
}

out=mx_strvec_join(",",strvec);
mx_strvec_free(strvec);
return out;
}
6 changes: 6 additions & 0 deletions mx_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <stdarg.h>
#include <string.h>
#include <stdio.h>
#include <sched.h>

#include "mx_log.h"

Expand Down Expand Up @@ -146,6 +147,7 @@ int mx_strtoi16(char *str, int16_t *to);
int mx_strtoi32(char *str, int32_t *to);
int mx_strtoi64(char *str, int64_t *to);

void *mx_malloc_forever(size_t size);
char *mx_strdup_forever(char *str);
int mx_vasprintf_forever(char **strp, const char *fmt, va_list ap);
int mx_asprintf_forever(char **strp, const char *fmt, ...) __attribute__ ((format(printf, 2, 3)));
Expand Down Expand Up @@ -189,5 +191,9 @@ int mx_strvec_push_strvec(char*** strvecp, char **strvec);
char* mx_strvec_to_str(char **strvec);
char** mx_strvec_from_str(char *str);
void mx_strvec_free(char **strvec);
char* mx_strvec_join(char *sep,char **strvec);

char* mx_cpuset_to_str(cpu_set_t* cpuset_ptr);
int mx_str_to_cpuset(cpu_set_t* cpuset_ptr,char *str);

#endif
3 changes: 3 additions & 0 deletions mxq_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <sys/time.h>
#include <sys/resource.h>

#include <sched.h>

#include "mxq_group.h"

struct mxq_job {
Expand Down Expand Up @@ -42,6 +44,7 @@ struct mxq_job {

uint32_t host_pid;
uint32_t host_slots;
cpu_set_t host_cpu_set;

int64_t date_submit;
int64_t date_start;
Expand Down
92 changes: 87 additions & 5 deletions mxqd.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <unistd.h>
#include <errno.h>

#include <sched.h>

#include <sysexits.h>

Expand Down Expand Up @@ -61,7 +62,7 @@ static void print_usage(void)
" %s [options]\n"
"\n"
"options:\n"
" -j, --slots <slots> default: 1\n"
" -j, --slots <slots> default: depends on number of cores\n"
" -m, --memory <memory> default: 2G\n"
" -x, --max-memory-per-slot <mem> default: <memory>/<slots>\n"
"\n"
Expand Down Expand Up @@ -96,6 +97,36 @@ static void print_usage(void)
);
}

static void cpuset_log(char *prefix,cpu_set_t *cpuset)
{
char *str;
str=mx_cpuset_to_str(cpuset);
mx_log_info("%s: [%s]",prefix,str);
free(str);
}

static void cpuset_init_job(cpu_set_t *job_cpu_set,cpu_set_t *available,cpu_set_t *running,int slots)
{
int cpu;
CPU_ZERO(job_cpu_set);
for (cpu=CPU_SETSIZE-1;slots&&cpu>=0;cpu--) {
if (CPU_ISSET(cpu,available) && !CPU_ISSET(cpu,running)) {
CPU_SET(cpu,job_cpu_set);
CPU_SET(cpu,running);
slots--;
}
}
}

static void cpuset_clear_running(cpu_set_t *running,cpu_set_t *job) {
int cpu;
for (cpu=0;cpu<CPU_SETSIZE;cpu++) {
if (CPU_ISSET(cpu,job)) {
CPU_CLR(cpu,running);
}
}
}

/**********************************************************************/
int setup_cronolog(char *cronolog, char *link, char *format)
{
Expand Down Expand Up @@ -190,6 +221,45 @@ int write_pid_to_file(char *fname)
return 0;
}

static int cpuset_init(struct mxq_server *server)
{
int res;
int available_cnt;
int cpu;
int slots;

slots=server->slots;

res=sched_getaffinity(0,sizeof(server->cpu_set_available),&server->cpu_set_available);
if (res<0) {
mx_log_err("sched_getaffinity: (%m)");
return(-errno);
}
available_cnt=CPU_COUNT(&server->cpu_set_available);
if (slots) {
if (slots>available_cnt) {
mx_log_err("%d slots requested, but only %d cores available",slots,available_cnt);
return(-(errno=EINVAL));
}
} else {
if (available_cnt>=16) {
slots=available_cnt-2;
} else if (available_cnt>=4) {
slots=available_cnt-1;
} else {
slots=available_cnt;
}
}

for (cpu=0;cpu<CPU_SETSIZE && available_cnt>slots;cpu++) {
if (CPU_ISSET(cpu,&server->cpu_set_available)) {
CPU_CLR(cpu,&server->cpu_set_available);
available_cnt--;
}
}
server->slots=slots;
return(0);
}

int server_init(struct mxq_server *server, int argc, char *argv[])
{
Expand All @@ -205,7 +275,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
char arg_nolog = 0;
char *str_bootid;
int opt;
unsigned long threads_total = 1;
unsigned long threads_total = 0;
unsigned long memory_total = 2048;
unsigned long memory_max = 0;
int i;
Expand Down Expand Up @@ -288,8 +358,6 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
mx_log_err("Invalid argument supplied for option --slots '%s': %m", optctl.optarg);
exit(1);
}
if (!threads_total)
threads_total = 1;
break;

case 'm':
Expand Down Expand Up @@ -432,7 +500,12 @@ int server_init(struct mxq_server *server, int argc, char *argv[])

mx_asprintf_forever(&server->host_id, "%s-%llx-%x", server->boot_id, server->starttime, getpid());

server->slots = threads_total;;
server->slots = threads_total;
res = cpuset_init(server);
if (res < 0) {
mx_log_err("MAIN: cpuset_init() failed. exiting.");
exit(1);
}
server->memory_total = memory_total;
server->memory_max_per_slot = memory_max;
server->memory_avg_per_slot = (long double)server->memory_total / (long double)server->slots;
Expand Down Expand Up @@ -919,6 +992,9 @@ static int init_child_process(struct mxq_group_list *group, struct mxq_job *j)

umask(j->job_umask);

res=sched_setaffinity(0,sizeof(j->host_cpu_set),&j->host_cpu_set);
if (res<0) mx_log_warning("sched_setaffinity: $m");

return 1;
}

Expand Down Expand Up @@ -1037,6 +1113,9 @@ unsigned long start_job(struct mxq_group_list *group)
mx_log_info(" job=%s(%d):%lu:%lu :: new job loaded.",
group->group.user_name, group->group.user_uid, group->group.group_id, mxqjob.job_id);

cpuset_init_job(&mxqjob.host_cpu_set,&server->cpu_set_available,&server->cpu_set_running,group->slots_per_job);
cpuset_log(" job assgined cpus: ",&mxqjob.host_cpu_set);

mx_mysql_disconnect(server->mysql);

pid = fork();
Expand Down Expand Up @@ -1349,6 +1428,7 @@ void server_dump(struct mxq_server *server)

mx_log_info("memory_used=%lu memory_total=%lu", server->memory_used, server->memory_total);
mx_log_info("slots_running=%lu slots=%lu threads_running=%lu jobs_running=%lu", server->slots_running, server->slots, server->threads_running, server->jobs_running);
cpuset_log("cpu set running",&server->cpu_set_running);
mx_log_info("====================== SERVER DUMP END ======================");
}

Expand Down Expand Up @@ -1618,6 +1698,7 @@ int catchall(struct mxq_server *server) {
}

cnt += job->group->slots_per_job;
cpuset_clear_running(&server->cpu_set_running,&j->host_cpu_set);
mxq_job_free_content(j);
free(job);
}
Expand Down Expand Up @@ -1725,6 +1806,7 @@ int main(int argc, char *argv[])
mx_log_info(" host_id=%s", server.host_id);
mx_log_info("slots=%lu memory_total=%lu memory_avg_per_slot=%.0Lf memory_max_per_slot=%ld :: server initialized.",
server.slots, server.memory_total, server.memory_avg_per_slot, server.memory_max_per_slot);
cpuset_log("cpu set available",&server.cpu_set_available);

/*** database connect ***/

Expand Down
3 changes: 3 additions & 0 deletions mxqd.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define __MXQ_SERVER_H__ 1

#include "mx_mysql.h"
#include <sched.h>

struct mxq_job_list {
struct mxq_group_list *group;
Expand Down Expand Up @@ -62,11 +63,13 @@ struct mxq_server {
unsigned long threads_running;
unsigned long slots_running;
unsigned long memory_used;
cpu_set_t cpu_set_running;

unsigned long slots;
unsigned long memory_total;
long double memory_avg_per_slot;
unsigned long memory_max_per_slot;
cpu_set_t cpu_set_available;

struct mx_mysql *mysql;

Expand Down
Loading

0 comments on commit 96df4dc

Please sign in to comment.