Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
mxqd: add hard memory limit for jobs
  • Loading branch information
mariux committed Nov 20, 2015
1 parent 4d9d4cf commit dfe84c7
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 46 deletions.
12 changes: 12 additions & 0 deletions mxq_daemon.c
Expand Up @@ -26,6 +26,8 @@
" daemon_slots," \
" daemon_memory," \
" daemon_time," \
" daemon_memory_limit_slot_soft," \
" daemon_memory_limit_slot_hard," \
" daemon_jobs_running," \
" daemon_slots_running," \
" daemon_threads_running," \
Expand Down Expand Up @@ -56,6 +58,9 @@ static int bind_result_daemon_fields(struct mx_mysql_bind *result, struct mxq_da
res += mx_mysql_bind_var(result, idx++, uint64, &(daemon->daemon_memory));
res += mx_mysql_bind_var(result, idx++, uint32, &(daemon->daemon_time));

res += mx_mysql_bind_var(result, idx++, uint64, &(daemon->daemon_memory_limit_slot_soft));
res += mx_mysql_bind_var(result, idx++, uint64, &(daemon->daemon_memory_limit_slot_hard));

res += mx_mysql_bind_var(result, idx++, uint32, &(daemon->daemon_jobs_running));
res += mx_mysql_bind_var(result, idx++, uint32, &(daemon->daemon_slots_running));
res += mx_mysql_bind_var(result, idx++, uint32, &(daemon->daemon_threads_running));
Expand Down Expand Up @@ -101,6 +106,8 @@ int mxq_daemon_register(struct mx_mysql *mysql, struct mxq_daemon *daemon)
assert(daemon->daemon_pid);
assert(daemon->daemon_slots);
assert(daemon->daemon_memory);
assert(daemon->daemon_memory_limit_slot_soft <= daemon->daemon_memory_limit_slot_hard);
assert(daemon->daemon_memory_limit_slot_hard <= daemon->daemon_memory);

stmt = mx_mysql_statement_prepare(mysql,
"INSERT INTO"
Expand All @@ -116,6 +123,8 @@ int mxq_daemon_register(struct mx_mysql *mysql, struct mxq_daemon *daemon)
" daemon_slots = ?,"
" daemon_memory = ?,"
" daemon_time = ?,"
" daemon_memory_limit_slot_soft = ?,"
" daemon_memory_limit_slot_hard = ?,"
" daemon_jobs_running = 0,"
" daemon_slots_running = 0,"
" daemon_threads_running = 0,"
Expand Down Expand Up @@ -145,6 +154,9 @@ int mxq_daemon_register(struct mx_mysql *mysql, struct mxq_daemon *daemon)
res += mx_mysql_statement_param_bind(stmt, idx++, uint64, &(daemon->daemon_memory));
res += mx_mysql_statement_param_bind(stmt, idx++, uint32, &(daemon->daemon_time));

res += mx_mysql_statement_param_bind(stmt, idx++, uint64, &(daemon->daemon_memory_limit_slot_soft));
res += mx_mysql_statement_param_bind(stmt, idx++, uint64, &(daemon->daemon_memory_limit_slot_hard));

assert(res ==0);

res = mx_mysql_statement_execute(stmt, &num_rows);
Expand Down
3 changes: 3 additions & 0 deletions mxq_daemon.h
Expand Up @@ -24,6 +24,9 @@ struct mxq_daemon {
uint64_t daemon_memory;
uint32_t daemon_time;

uint64_t daemon_memory_limit_slot_soft;
uint64_t daemon_memory_limit_slot_hard;

uint32_t daemon_jobs_running;
uint32_t daemon_slots_running;
uint32_t daemon_threads_running;
Expand Down
73 changes: 54 additions & 19 deletions mxqd.c
Expand Up @@ -72,11 +72,17 @@ static void print_usage(void)
"\n"
"options:\n"
" -j, --slots <slots> default: depends on number of cores\n"
" -m, --memory <memory> default: 2G\n"
" -x, --max-memory-per-slot <mem> default: <memory>/<slots>\n"
" -m, --memory <totalmemory> default: 2G\n"
"\n"
" -x, --max-memory-per-slot-soft <softlimit>\n"
" root user: default: <totalmemory>/<slots>\n"
" non-root user: default: <totalmemory>\n"
"\n"
" -X, --max-memory-per-slot-hard <hardlimit>\n"
" default: <totalmemory>\n"
"\n"
" -N, --server-id <id> default: main\n"
" --hostname <hostname> default: $(hostname)\n"
" --hostname <hostname> default: system hostname\n"
"\n"
" --pid-file <pidfile> default: create no pid file\n"
" --daemonize default: run in foreground\n"
Expand Down Expand Up @@ -313,7 +319,8 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
int opt;
unsigned long arg_threads_total = 0;
unsigned long arg_memory_total = 2048;
unsigned long arg_memory_max = 0;
unsigned long arg_memory_limit_slot_soft = 0;
unsigned long arg_memory_limit_slot_hard = 0;
int i;

_mx_cleanup_free_ struct mx_proc_pid_stat *pps = NULL;
Expand All @@ -334,7 +341,9 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
MX_OPTION_REQUIRED_ARG("initial-tmpdir", 8),
MX_OPTION_REQUIRED_ARG("slots", 'j'),
MX_OPTION_REQUIRED_ARG("memory", 'm'),
MX_OPTION_REQUIRED_ARG("max-memory-per-slot", 'x'),
MX_OPTION_REQUIRED_ARG("max-memory-per-slot", 'x'),
MX_OPTION_REQUIRED_ARG("max-memory-per-slot-soft", 'x'),
MX_OPTION_REQUIRED_ARG("max-memory-per-slot-hard", 'X'),
MX_OPTION_REQUIRED_ARG("server-id", 'N'),
MX_OPTION_REQUIRED_ARG("hostname", 6),
MX_OPTION_OPTIONAL_ARG("mysql-default-file", 'M'),
Expand Down Expand Up @@ -443,14 +452,26 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
break;

case 'x':
if (mx_strtoul(optctl.optarg, &arg_memory_max) < 0) {
if (mx_strtoul(optctl.optarg, &arg_memory_limit_slot_soft) < 0) {
unsigned long long int bytes;

if(mx_strtobytes(optctl.optarg, &bytes) < 0) {
mx_log_err("Invalid argument supplied for option --max-memory-per-slot '%s': %m", optctl.optarg);
mx_log_err("Invalid argument supplied for option --max-memory-per-slot-soft '%s': %m", optctl.optarg);
return -EX_USAGE;
}
arg_memory_max = bytes/1024/1024;
arg_memory_limit_slot_soft = bytes/1024/1024;
}
break;

case 'X':
if (mx_strtoul(optctl.optarg, &arg_memory_limit_slot_hard) < 0) {
unsigned long long int bytes;

if(mx_strtobytes(optctl.optarg, &bytes) < 0) {
mx_log_err("Invalid argument supplied for option --max-memory-per-slot-hard '%s': %m", optctl.optarg);
return -EX_USAGE;
}
arg_memory_limit_slot_hard = bytes/1024/1024;
}
break;

Expand Down Expand Up @@ -589,19 +610,32 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
return -EX_OSERR;
}
server->memory_total = arg_memory_total;
server->memory_max_per_slot = arg_memory_max;

/* if run as non-root use full memory by default for every job */
if (!arg_memory_max && !RUNNING_AS_ROOT)
server->memory_max_per_slot = arg_memory_total;

server->memory_avg_per_slot = (long double)server->memory_total / (long double)server->slots;

if (server->memory_max_per_slot < server->memory_avg_per_slot)
server->memory_max_per_slot = server->memory_avg_per_slot;
if (!arg_memory_limit_slot_hard) {
arg_memory_limit_slot_hard = server->memory_total;
} else if (arg_memory_limit_slot_hard < server->memory_avg_per_slot) {
arg_memory_limit_slot_hard = server->memory_avg_per_slot;
} else if (arg_memory_limit_slot_hard > server->memory_total) {
arg_memory_limit_slot_hard = server->memory_total;
}
server->memory_limit_slot_hard = arg_memory_limit_slot_hard;

if (server->memory_max_per_slot > server->memory_total)
server->memory_max_per_slot = server->memory_total;
if (!arg_memory_limit_slot_soft) {
if (RUNNING_AS_ROOT) {
arg_memory_limit_slot_soft = server->memory_avg_per_slot;
} else {
arg_memory_limit_slot_soft = server->memory_total;
}
} else if (arg_memory_limit_slot_soft > server->memory_limit_slot_hard) {
arg_memory_limit_slot_soft = server->memory_limit_slot_hard;
} else if (arg_memory_limit_slot_soft < server->memory_avg_per_slot) {
arg_memory_limit_slot_soft = server->memory_avg_per_slot;
} else if (arg_memory_limit_slot_soft > server->memory_total) {
arg_memory_limit_slot_soft = server->memory_total;
}
server->memory_limit_slot_soft = arg_memory_limit_slot_soft;

return 0;
}
Expand Down Expand Up @@ -2265,11 +2299,12 @@ int main(int argc, char *argv[])
server->hostname,
server->server_id);
mx_log_info(" host_id=%s", server->host_id);
mx_log_info("slots=%lu memory_total=%lu memory_avg_per_slot=%.0Lf memory_max_per_slot=%ld :: server initialized.",
mx_log_info("slots=%lu memory_total=%lu memory_avg_per_slot=%.0Lf memory_limit_slot_soft=%ld memory_limit_slot_hard=%ld :: server initialized.",
server->slots,
server->memory_total,
server->memory_avg_per_slot,
server->memory_max_per_slot);
server->memory_limit_slot_soft,
server->memory_limit_slot_hard);
cpuset_log("cpu set available", &(server->cpu_set_available));

/*** database connect ***/
Expand Down
7 changes: 4 additions & 3 deletions mxqd.h
Expand Up @@ -28,9 +28,9 @@ struct mxq_group_list {

unsigned long job_cnt;

long double memory_per_thread;
long double memory_per_job_thread;
unsigned long slots_per_job;
long double memory_max_available;
long double memory_available_for_group;
unsigned long memory_max;
unsigned long slots_max;
unsigned long jobs_max;
Expand Down Expand Up @@ -84,7 +84,8 @@ struct mxq_server {
unsigned long slots;
unsigned long memory_total;
long double memory_avg_per_slot;
unsigned long memory_max_per_slot;
unsigned long memory_limit_slot_soft;
unsigned long memory_limit_slot_hard;
cpu_set_t cpu_set_available;

struct mx_mysql *mysql;
Expand Down
56 changes: 33 additions & 23 deletions mxqd_control.c
Expand Up @@ -14,11 +14,12 @@ static void _group_list_init(struct mxq_group_list *glist)
struct mxq_server *server;
struct mxq_group *group;

long double memory_threads;
long double memory_per_thread;
long double memory_max_available;
long double memory_per_job_thread;
long double memory_available_for_group;

unsigned long slots_per_job;
unsigned long slots_per_job_memory;
unsigned long slots_per_job_cpu;
unsigned long jobs_max;
unsigned long slots_max;
unsigned long memory_max;
Expand All @@ -30,27 +31,36 @@ static void _group_list_init(struct mxq_group_list *glist)
server = glist->user->server;
group = &glist->group;

memory_per_thread = (long double)group->job_memory / (long double)group->job_threads;
memory_max_available = (long double)server->memory_total * (long double)server->memory_max_per_slot / memory_per_thread;
memory_per_job_thread = (long double)group->job_memory / (long double)group->job_threads;

if (memory_max_available > server->memory_total)
memory_max_available = server->memory_total;
/* max_memory_per_server_slot_soft < memory_per_job_thread => limit total memory for group default: avg_memory_per_server_slot*/
/* max_memory_per_server_slot_hard < memory_per_job_thread => do not start jobs for group default: memory_total */

slots_per_job = ceill((long double)group->job_memory / server->memory_avg_per_slot);
/* memory_available_for_group = memory_total * max_memory_per_server_slot_soft / memory_per_job_thread */
memory_available_for_group = (long double)server->memory_total * (long double)server->memory_limit_slot_soft / memory_per_job_thread;

if (slots_per_job < group->job_threads)
slots_per_job = group->job_threads;
if (memory_available_for_group > (long double)server->memory_total)
memory_available_for_group = (long double)server->memory_total;

memory_threads = memory_max_available / memory_per_thread;
/* memory_slots_per_job = memory_per_job / memory_per_server_slot */
/* cpu_slots_per_job = job_threads */
/* slots_per_job = max(memory_slots_per_job, cpu_slots_per_job) */

if (memory_per_thread > server->memory_max_per_slot) {
jobs_max = memory_threads + 0.5;
} else if (memory_per_thread > server->memory_avg_per_slot) {
jobs_max = memory_threads + 0.5;
slots_per_job_memory = (unsigned long)ceill((long double)group->job_memory / server->memory_avg_per_slot);
slots_per_job_cpu = group->job_threads;

if (slots_per_job_memory < slots_per_job_cpu)
slots_per_job = slots_per_job_cpu;
else
slots_per_job = slots_per_job_memory;

if (memory_per_job_thread > server->memory_limit_slot_hard) {
jobs_max = 0;
} else if (memory_per_job_thread > server->memory_avg_per_slot) {
jobs_max = (unsigned long)ceill(memory_available_for_group / (long double)group->job_memory);
} else {
jobs_max = server->slots;
jobs_max = server->slots / group->job_threads;
}
jobs_max /= group->job_threads;

if (jobs_max > server->slots / slots_per_job)
jobs_max = server->slots / slots_per_job;
Expand All @@ -62,26 +72,26 @@ static void _group_list_init(struct mxq_group_list *glist)
slots_max = jobs_max * slots_per_job;
memory_max = jobs_max * group->job_memory;

if (glist->memory_per_thread != memory_per_thread
|| glist->memory_max_available != memory_max_available
|| glist->memory_max_available != memory_max_available
if (glist->memory_per_job_thread != memory_per_job_thread
|| glist->memory_available_for_group != memory_available_for_group
|| glist->slots_per_job != slots_per_job
|| glist->jobs_max != jobs_max
|| glist->slots_max != slots_max
|| glist->memory_max != memory_max) {
mx_log_info(" group=%s(%u):%lu jobs_max=%lu slots_max=%lu memory_max=%lu slots_per_job=%lu :: group %sinitialized.",
mx_log_info(" group=%s(%u):%lu jobs_max=%lu slots_max=%lu memory_max=%lu slots_per_job=%lu memory_per_job_thread=%Lf :: group %sinitialized.",
group->user_name,
group->user_uid,
group->group_id,
jobs_max,
slots_max,
memory_max,
slots_per_job,
memory_per_job_thread,
glist->orphaned ? "re" : "");
}

glist->memory_per_thread = memory_per_thread;
glist->memory_max_available = memory_max_available;
glist->memory_per_job_thread = memory_per_job_thread;
glist->memory_available_for_group = memory_available_for_group;

glist->slots_per_job = slots_per_job;

Expand Down
3 changes: 3 additions & 0 deletions mysql/create_tables.sql
Expand Up @@ -148,6 +148,9 @@ CREATE TABLE IF NOT EXISTS mxq_daemon (
daemon_memory INT8 UNSIGNED NOT NULL DEFAULT 0,
daemon_time INT4 UNSIGNED NOT NULL DEFAULT 0,

daemon_memory_limit_slot_soft INT8 UNSIGNED NOT NULL DEFAULT 0,
daemon_memory_limit_slot_hard INT8 UNSIGNED NOT NULL DEFAULT 0,

daemon_jobs_running INT4 UNSIGNED NOT NULL DEFAULT 0,
daemon_slots_running INT4 UNSIGNED NOT NULL DEFAULT 0,
daemon_threads_running INT4 UNSIGNED NOT NULL DEFAULT 0,
Expand Down
2 changes: 1 addition & 1 deletion test_mxqd_control.c
Expand Up @@ -12,7 +12,7 @@
void __init_server(struct mxq_server *server)
{
server->memory_total = MEMORY_TOTAL;
server->memory_max_per_slot = MEMORY_MAX_PER_SLOT;
server->memory_limit_slot_soft = MEMORY_MAX_PER_SLOT;
server->slots = SLOTS;
server->memory_avg_per_slot = MEMORY_TOTAL / SLOTS;
}
Expand Down

0 comments on commit dfe84c7

Please sign in to comment.