Skip to content

Add exclusive flag #85

Closed
wants to merge 11 commits into from
8 changes: 4 additions & 4 deletions Makefile
Expand Up @@ -418,10 +418,10 @@ clean: CLEAN += mxqadmin.o

### mxqset.o ----------------------------------------------------------

mxqsset.o: $(mx_mysql.h)
mxqsset.o: $(keywordset.h)
mxqkill.o: $(mxq.h)
mxqkill.o: $(mxq_group.h)
mxqset.o: $(mx_mysql.h)
mxqset.o: $(keywordset.h)
mxqset.o: $(mxq.h)
mxqset.o: $(mxq_group.h)
mxqset.o: CFLAGS += $(CFLAGS_MYSQL)

clean: CLEAN += mxqsset.o
Expand Down
38 changes: 0 additions & 38 deletions mx_util.c
Expand Up @@ -19,8 +19,6 @@
#include "mx_log.h"
#include "mx_util.h"

static inline size_t mx_strvec_length_cache(char **strvec, int32_t len);

static inline int _mx_strbeginswith(char *str, const char *start, char **endptr, short ignore_case)
{
size_t len;
Expand Down Expand Up @@ -933,40 +931,14 @@ void *mx_calloc_forever_sec(size_t nmemb, size_t size, unsigned int time)
char **mx_strvec_new(void)
{
char **strvec;
size_t len;

strvec = calloc(sizeof(*strvec), 1);
if (!strvec)
return NULL;

len = mx_strvec_length_cache(strvec, -1);
if (len != -1)
mx_strvec_length_cache(strvec, 0);

return strvec;
}

static inline size_t mx_strvec_length_cache(char **strvec, int32_t len)
{
static char ** sv = NULL;
static size_t l = 0;

if (likely(len == -1)) {
if (likely(sv == strvec)) {
return l;
}
return -1;
}

if (likely(sv == strvec)) {
l = len;
} else {
sv = strvec;
l = len;
}
return l;
}

size_t mx_strvec_length(char ** strvec)
{
char ** sv;
Expand All @@ -975,15 +947,9 @@ size_t mx_strvec_length(char ** strvec)
assert(strvec);

sv = strvec;

len = mx_strvec_length_cache(sv, -1);
if (len != -1)
return len;

for (; *sv; sv++);

len = sv-strvec;
mx_strvec_length_cache(sv, len);
return len;
}

Expand All @@ -1007,8 +973,6 @@ int mx_strvec_push_str(char *** strvecp, char * str)
sv[len++] = str;
sv[len] = NULL;

mx_strvec_length_cache(sv, len);

*strvecp = sv;

return 1;
Expand All @@ -1035,8 +999,6 @@ int mx_strvec_push_strvec(char ***strvecp, char **strvec)

memcpy(sv+len1, strvec, sizeof(*strvec) * (len2 + 1));

mx_strvec_length_cache(sv, len1+len2);

*strvecp = sv;

return 1;
Expand Down
9 changes: 7 additions & 2 deletions mxq_daemon.c
Expand Up @@ -34,7 +34,8 @@
" daemon_memory_used," \
" UNIX_TIMESTAMP(mtime) as mtime," \
" UNIX_TIMESTAMP(daemon_start) as daemon_start," \
" UNIX_TIMESTAMP(daemon_stop) as daemon_stop"
" UNIX_TIMESTAMP(daemon_stop) as daemon_stop," \
" daemon_flags"

#undef _to_string
#undef status_str
Expand Down Expand Up @@ -75,6 +76,8 @@ static int bind_result_daemon_fields(struct mx_mysql_bind *result, struct mxq_da
res += mx_mysql_bind_var(result, idx++, int64, &(daemon->daemon_start.tv_sec));
res += mx_mysql_bind_var(result, idx++, int64, &(daemon->daemon_stop.tv_sec));

res += mx_mysql_bind_var(result, idx++, int32, &(daemon->daemon_flags));

return res;
}

Expand Down Expand Up @@ -136,7 +139,8 @@ int mxq_daemon_register(struct mx_mysql *mysql, struct mxq_daemon *daemon)
" daemon_memory_used = 0,"
" mtime = NULL,"
" daemon_start = CURRENT_TIMESTAMP(),"
" daemon_stop = 0"
" daemon_stop = 0,"
" daemon_flags = ?"
);
if (!stmt) {
mx_log_err("mx_mysql_statement_prepare(): %m");
Expand All @@ -161,6 +165,7 @@ int mxq_daemon_register(struct mx_mysql *mysql, struct mxq_daemon *daemon)

res += mx_mysql_statement_param_bind(stmt, idx++, uint64, &(daemon->daemon_memory_limit_slot_soft));
res += mx_mysql_statement_param_bind(stmt, idx++, uint64, &(daemon->daemon_memory_limit_slot_hard));
res += mx_mysql_statement_param_bind(stmt, idx++, int32, &(daemon->daemon_flags));

assert(res ==0);

Expand Down
2 changes: 2 additions & 0 deletions mxq_daemon.h
Expand Up @@ -46,6 +46,8 @@ struct mxq_daemon {

struct timeval daemon_start;
struct timeval daemon_stop;

int daemon_flags;
};

void mxq_daemon_free_content(struct mxq_daemon *daemon);
Expand Down
13 changes: 12 additions & 1 deletion mxqd.c
Expand Up @@ -102,6 +102,7 @@ static void print_usage(void)
" --debug default: info log level\n"
"\n"
" --recover-only (recover from crash and exit)\n"
" --exclusive run in exclusive mode\n"
"\n"
" --initial-path <path> default: %s\n"
" --initial-tmpdir <directory> default: %s\n"
Expand Down Expand Up @@ -341,6 +342,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
char arg_daemonize = 0;
char arg_nolog = 0;
char arg_recoveronly = 0;
int arg_exclusive = 0;
char *str_bootid;
int opt;
unsigned long arg_threads_total = 0;
Expand Down Expand Up @@ -378,6 +380,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
MX_OPTION_OPTIONAL_ARG("mysql-default-file", 'M'),
MX_OPTION_OPTIONAL_ARG("mysql-default-group", 'S'),
MX_OPTION_OPTIONAL_ARG("max-time", 't'),
MX_OPTION_NO_ARG("exclusive", 11),
MX_OPTION_END
};

Expand Down Expand Up @@ -531,6 +534,10 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
return -EX_USAGE;
}
break;

case 11:
arg_exclusive = 1;
break;
}
}

Expand Down Expand Up @@ -712,6 +719,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[])
daemon->daemon_maxtime = server->maxtime;
daemon->daemon_memory_limit_slot_soft = server->memory_limit_slot_soft;
daemon->daemon_memory_limit_slot_hard = server->memory_limit_slot_hard;
daemon->daemon_flags = arg_exclusive;

return 0;
}
Expand Down Expand Up @@ -1300,6 +1308,8 @@ static int server_is_qualified(struct mxq_server *server, struct mxq_group *grou
is_qualified = 1;
keywordset_free(kws);
} else {
if (server->daemon.daemon_flags)
return 0; // exclusive
is_qualified = 1;
}

Expand Down Expand Up @@ -1862,7 +1872,8 @@ static int job_has_finished(struct mxq_server *server, struct mxq_group *group,

job=&jlist->job;

unmount_job_tmpdir(job->job_id);
if (group->job_tmpdir_size > 0)
unmount_job_tmpdir(job->job_id);

mxq_set_job_status_exited(server->mysql, job);

Expand Down
2 changes: 2 additions & 0 deletions mysql/create_tables.sql
Expand Up @@ -165,6 +165,8 @@ CREATE TABLE IF NOT EXISTS mxq_daemon (
daemon_start TIMESTAMP NOT NULL DEFAULT 0,
daemon_stop TIMESTAMP NOT NULL DEFAULT 0,

daemon_flags INT4 NOT NULL DEFAULT 0,

INDEX (daemon_name(64)),
INDEX (hostname(64))
);
37 changes: 12 additions & 25 deletions test_mx_util.c
Expand Up @@ -348,30 +348,6 @@ static void test_mx_strscan(void)
mx_proc_pid_stat_free_content(pps);
}

static void test_mx_strvec_cachebug() {
char **strvec;
char **strvec2;
char *str;

strvec = mx_strvec_new();
assert(mx_strvec_length(strvec) == 0);

mx_strvec_push_str(&strvec, "Eins");
assert(mx_strvec_length(strvec) == 1);

str = mx_strvec_to_str(strvec);
assert(mx_streq(str, "Eins\\0"));

free(strvec); /* do not set to NULL for cache bug testing */

strvec2 = mx_strvec_new();
assert(mx_strvec_length(strvec2) == 0);
if (strvec != strvec2)
fprintf(stderr, "Warning: Can't test strvec cache bug. Skipping.\n");
mx_free_null(strvec2);
mx_free_null(str);
}

static void test_mx_strvec() {
char **strvec;
char *str;
Expand Down Expand Up @@ -406,6 +382,18 @@ static void test_mx_strvec() {
assert(strcmp(str,"AxxB")==0);
free(str);
mx_strvec_free(strvec);

char *test[] = { "AAA", "", "bbb", NULL};
str = mx_strvec_to_str(test);
assert(strcmp(str,"AAA\\0\\0bbb\\0")==0);
strvec = mx_strvec_from_str(str);
assert( strcmp(strvec[0], "AAA") == 0 );
assert( strcmp(strvec[1], "") == 0 );
assert( strcmp(strvec[2], "bbb") == 0 );
assert( strvec[3] == NULL);

free(str);
free(strvec);
}

static void test_mx_strcat() {
Expand Down Expand Up @@ -550,7 +538,6 @@ int main(int argc, char *argv[])
test_mx_read_first_line_from_file();
test_mx_strscan();
test_mx_strvec();
test_mx_strvec_cachebug();
test_mx_strcat();
test_mx_cpuset();
test_listsort();
Expand Down
12 changes: 7 additions & 5 deletions web/pages/mxq/mxq.in
Expand Up @@ -776,7 +776,7 @@ sub server() {
daemon_memory_limit_slot_soft daemon_memory_limit_slot_hard
daemon_jobs_running daemon_slots_running
daemon_threads_running daemon_memory_used
mtime daemon_start daemon_stop
mtime daemon_start daemon_stop daemon_flags
);

my $sth=$dbh->prepare('SELECT '.join(',',@cols).' FROM mxq_daemon WHERE status<=200 ORDER BY hostname,daemon_name');
Expand Down Expand Up @@ -807,6 +807,7 @@ sub server() {
# 'mtime',
# 'start',
# 'stop',
'F'
]));

my %S;
Expand All @@ -817,7 +818,7 @@ sub server() {
$daemon_memory_limit_slot_soft,$daemon_memory_limit_slot_hard,
$daemon_jobs_running,$daemon_slots_running,
$daemon_threads_running,$daemon_memory_used,
$mtime,$daemon_start,$daemon_stop
$mtime,$daemon_start,$daemon_stop,$daemon_flags
) = @$row;

$hostname =~s/\.molgen\.mpg\.de$//;
Expand Down Expand Up @@ -851,18 +852,19 @@ sub server() {
# $q->td($mtime),
# $q->td($daemon_start),
# $q->td($daemon_stop),
$q->td($daemon_flags ? 'X' : '&nbsp;' ),
);
}
map {
$out.=$q->Tr( $q->td(0),$q->td('-'),$q->td('no mxqd'),$q->td($_),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),);
$out.=$q->Tr( $q->td(0),$q->td('-'),$q->td('no mxqd'),$q->td($_),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'));
} keys %{$dead_hosts};

$out.=$q->Tr($q->td({colspan=>14},'&nbsp;'));
$out.=$q->Tr($q->td({colspan=>15},'&nbsp;'));
my $dist = join(', ',map {"$S{daemon_slots_dist}->{$_}x$_"} sort {$b <=> $a} keys %{$S{daemon_slots_dist}});
$out.=$q->Tr(
$q->td({class=>'center', colspan=>3},$S{servers}.' servers'),$q->td($dist),
$q->td({class=>'center', colspan=>3},$S{daemon_slots}.' cores'),$q->td({class=>'number'},size($S{daemon_memory_sum}*1000**2)),$q->td('&nbsp;'),$q->td('&nbsp;'),$q->td('&nbsp;'),
$q->td({class=>'number'},$S{daemon_slots_running}),$q->td('&nbsp;'),$q->td({class=>'number'},size($S{daemon_memory_used_sum}*1000**2)),);
$q->td({class=>'number'},$S{daemon_slots_running}),$q->td('&nbsp;'),$q->td({class=>'number'},size($S{daemon_memory_used_sum}*1000**2)),$q->td('&nbsp;'));

$out.='</table>';
return $out;
Expand Down