diff --git a/.gitignore b/.gitignore index 4599a3ca..3097c28d 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,11 @@ mxqd_control.o keywordset.o test_keywordset.o mxqset.o +parser.tab.c +parser.tab.h +parser.tab.o +test_parser.o +test_parser mxqsub diff --git a/Makefile b/Makefile index 921d3409..0b99359c 100644 --- a/Makefile +++ b/Makefile @@ -329,6 +329,7 @@ mxq_daemon.h += mxq_daemon.h ### mxqd.h ------------------------------------------------------------- mxqd.h += mxqd.h +mxqd.h += keywordset.h ### mxqd_conrol.h ------------------------------------------------------ @@ -418,10 +419,10 @@ clean: CLEAN += mxqadmin.o ### mxqset.o ---------------------------------------------------------- -mxqsset.o: $(mx_mysql.h) -mxqsset.o: $(keywordset.h) -mxqkill.o: $(mxq.h) -mxqkill.o: $(mxq_group.h) +mxqset.o: $(mx_mysql.h) +mxqset.o: $(keywordset.h) +mxqset.o: $(mxq.h) +mxqset.o: $(mxq_group.h) mxqset.o: CFLAGS += $(CFLAGS_MYSQL) clean: CLEAN += mxqsset.o @@ -487,6 +488,7 @@ mxqd.o: $(mxq_group.h) mxqd.o: $(mxq_job.h) mxqd.o: $(mx_mysql.h) mxqd.o: $(keywordset.h) +mxqd.o: parser.tab.h mxqd.o: CFLAGS += $(CFLAGS_MYSQL) mxqd.o: CFLAGS += $(CFLAGS_MXQ_INITIAL_PATH) mxqd.o: CFLAGS += $(CFLAGS_MXQ_INITIAL_TMPDIR) @@ -506,6 +508,7 @@ mxqsub.o: $(mxq_group.h) mxqsub.o: $(mxq_job.h) mxqsub.o: $(mx_util.h) mxqsub.o: $(keywordset.h) +mxqsub.o: parser.tab.h mxqsub.o: CFLAGS += $(CFLAGS_MYSQL) clean: CLEAN += mxqsub.o @@ -533,6 +536,7 @@ mxqd: mxq_job.o mxqd: mx_mysql.o mxqd: mxqd_control.o mxqd: keywordset.o +mxqd: parser.tab.o mxqd: LDLIBS += $(LDLIBS_MYSQL) build: mxqd @@ -549,6 +553,7 @@ mxqsub: mx_util.o mxqsub: mx_log.o mxqsub: mx_mysql.o mxqsub: keywordset.o +mxqsub: parser.tab.o mxqsub: LDLIBS += $(LDLIBS_MYSQL) build: mxqsub @@ -713,6 +718,7 @@ test_mxqd_control: mx_log.o test_mxqd_control: mx_util.o test_mxqd_control: mx_mysql.o test_mxqd_control: mxq_group.o +test_mxqd_control: keywordset.o test_mxqd_control: LDLIBS += $(LDLIBS_MYSQL) clean: CLEAN += test_mxqd_control @@ -724,3 +730,14 @@ test_keywordset: test_keywordset.o test_keywordset: keywordset.o clean: CLEAN += test_keywordset.o test: test_keywordset + +%.tab.c %.tab.h: %.y + $(call quiet-command,bison -d $<," BISON $@") + +clean: CLEAN += parser.tab.c parser.tab.h +test: test_parser +clean: CLEAN += test_parser +clean: CLEAN += test_parser.o +test_parser: test_parser.o parser.tab.o keywordset.o +test_parser.o: parser.tab.h keywordset.h +clean: CLEAN += parser.tab.o diff --git a/keywordset.c b/keywordset.c index b95adb5c..1ce14df3 100644 --- a/keywordset.c +++ b/keywordset.c @@ -24,8 +24,12 @@ static int find_name(struct keywordset *kws, char *name, size_t len) { if (kws->names[i][j] != name[j]) break; j++; - if (j==len) - return i; + if (j==len) { + if (kws->names[i][j] == 0) + return i; + else + break; + } } } return -1; @@ -111,6 +115,21 @@ void keywordset_update(struct keywordset *kws, char *input) { keywordset_update_phase(kws, input, PHASE_UPDATE); } +void keywordset_add(struct keywordset *kws, char *input) { + char *c=input; + char *name_start; + while (*c) { + while (*c && isspace(*c)) + c++; + if (*c) { + name_start=c++; + while (*c && !isspace(*c)) + c++; + add_name(kws, name_start, c-name_start); + } + } +} + struct keywordset *keywordset_new(char *input) { struct keywordset *kws = xmalloc(sizeof(*kws)); kws->nr_slots = KEYWORDSET_INITIAL_SLOTS; diff --git a/keywordset.h b/keywordset.h index 7484235b..2877a8ac 100644 --- a/keywordset.h +++ b/keywordset.h @@ -3,9 +3,12 @@ struct keywordset *keywordset_new(char *input); void keywordset_update(struct keywordset *kws, char *input); +void keywordset_add(struct keywordset *kws, char *input); char *keywordset_get(struct keywordset *kws); int keywordset_ismember(struct keywordset *kws, char *name); void keywordset_purge(struct keywordset *kws); void keywordset_free(struct keywordset *kws); +__attribute__ ((unused)) static void keywordset_free_byref (struct keywordset **kws) { keywordset_free(*kws); } + #endif diff --git a/mx_util.c b/mx_util.c index b686ba11..ebb76007 100644 --- a/mx_util.c +++ b/mx_util.c @@ -19,8 +19,6 @@ #include "mx_log.h" #include "mx_util.h" -static inline size_t mx_strvec_length_cache(char **strvec, int32_t len); - static inline int _mx_strbeginswith(char *str, const char *start, char **endptr, short ignore_case) { size_t len; @@ -933,40 +931,14 @@ void *mx_calloc_forever_sec(size_t nmemb, size_t size, unsigned int time) char **mx_strvec_new(void) { char **strvec; - size_t len; strvec = calloc(sizeof(*strvec), 1); if (!strvec) return NULL; - len = mx_strvec_length_cache(strvec, -1); - if (len != -1) - mx_strvec_length_cache(strvec, 0); - return strvec; } -static inline size_t mx_strvec_length_cache(char **strvec, int32_t len) -{ - static char ** sv = NULL; - static size_t l = 0; - - if (likely(len == -1)) { - if (likely(sv == strvec)) { - return l; - } - return -1; - } - - if (likely(sv == strvec)) { - l = len; - } else { - sv = strvec; - l = len; - } - return l; -} - size_t mx_strvec_length(char ** strvec) { char ** sv; @@ -975,15 +947,9 @@ size_t mx_strvec_length(char ** strvec) assert(strvec); sv = strvec; - - len = mx_strvec_length_cache(sv, -1); - if (len != -1) - return len; - for (; *sv; sv++); len = sv-strvec; - mx_strvec_length_cache(sv, len); return len; } @@ -1007,8 +973,6 @@ int mx_strvec_push_str(char *** strvecp, char * str) sv[len++] = str; sv[len] = NULL; - mx_strvec_length_cache(sv, len); - *strvecp = sv; return 1; @@ -1035,8 +999,6 @@ int mx_strvec_push_strvec(char ***strvecp, char **strvec) memcpy(sv+len1, strvec, sizeof(*strvec) * (len2 + 1)); - mx_strvec_length_cache(sv, len1+len2); - *strvecp = sv; return 1; diff --git a/mxq_daemon.c b/mxq_daemon.c index 432fd532..6e2cc00e 100644 --- a/mxq_daemon.c +++ b/mxq_daemon.c @@ -34,7 +34,9 @@ " daemon_memory_used," \ " UNIX_TIMESTAMP(mtime) as mtime," \ " UNIX_TIMESTAMP(daemon_start) as daemon_start," \ - " UNIX_TIMESTAMP(daemon_stop) as daemon_stop" + " UNIX_TIMESTAMP(daemon_stop) as daemon_stop," \ + " daemon_flags," \ + " prerequisites" #undef _to_string #undef status_str @@ -75,6 +77,9 @@ static int bind_result_daemon_fields(struct mx_mysql_bind *result, struct mxq_da res += mx_mysql_bind_var(result, idx++, int64, &(daemon->daemon_start.tv_sec)); res += mx_mysql_bind_var(result, idx++, int64, &(daemon->daemon_stop.tv_sec)); + res += mx_mysql_bind_var(result, idx++, int32, &(daemon->daemon_flags)); + res += mx_mysql_bind_var(result, idx++, string, &(daemon->prerequisites)); + return res; } @@ -136,7 +141,9 @@ int mxq_daemon_register(struct mx_mysql *mysql, struct mxq_daemon *daemon) " daemon_memory_used = 0," " mtime = NULL," " daemon_start = CURRENT_TIMESTAMP()," - " daemon_stop = 0" + " daemon_stop = 0," + " daemon_flags = ?," + " prerequisites = ?" ); if (!stmt) { mx_log_err("mx_mysql_statement_prepare(): %m"); @@ -161,6 +168,8 @@ int mxq_daemon_register(struct mx_mysql *mysql, struct mxq_daemon *daemon) res += mx_mysql_statement_param_bind(stmt, idx++, uint64, &(daemon->daemon_memory_limit_slot_soft)); res += mx_mysql_statement_param_bind(stmt, idx++, uint64, &(daemon->daemon_memory_limit_slot_hard)); + res += mx_mysql_statement_param_bind(stmt, idx++, int32, &(daemon->daemon_flags)); + res += mx_mysql_statement_param_bind(stmt, idx++, string, &(daemon->prerequisites)); assert(res ==0); diff --git a/mxq_daemon.h b/mxq_daemon.h index 5676d7fb..d3887012 100644 --- a/mxq_daemon.h +++ b/mxq_daemon.h @@ -46,6 +46,9 @@ struct mxq_daemon { struct timeval daemon_start; struct timeval daemon_stop; + + int daemon_flags; + char *prerequisites; }; void mxq_daemon_free_content(struct mxq_daemon *daemon); diff --git a/mxq_group.c b/mxq_group.c index 25daa998..8760a448 100644 --- a/mxq_group.c +++ b/mxq_group.c @@ -12,7 +12,7 @@ #include "mx_util.h" #include "mx_mysql.h" -#define GROUP_FIELDS_CNT 35 +#define GROUP_FIELDS_CNT 36 #define GROUP_FIELDS \ " group_id," \ " group_name," \ @@ -21,6 +21,7 @@ " group_priority," \ " group_blacklist," \ " group_whitelist," \ + " prerequisites," \ " user_uid," \ " user_name," \ " user_gid," \ @@ -66,6 +67,7 @@ static int bind_result_group_fields(struct mx_mysql_bind *result, struct mxq_gro res += mx_mysql_bind_var(result, idx++, uint16, &(g->group_priority)); res += mx_mysql_bind_var(result, idx++, string, &(g->group_blacklist)); res += mx_mysql_bind_var(result, idx++, string, &(g->group_whitelist)); + res += mx_mysql_bind_var(result, idx++, string, &(g->prerequisites)); res += mx_mysql_bind_var(result, idx++, uint32, &(g->user_uid)); res += mx_mysql_bind_var(result, idx++, string, &(g->user_name)); @@ -109,6 +111,7 @@ static int bind_result_group_fields(struct mx_mysql_bind *result, struct mxq_gro void mxq_group_free_content(struct mxq_group *g) { + mx_free_null(g->prerequisites); mx_free_null(g->group_whitelist); mx_free_null(g->group_blacklist); mx_free_null(g->group_name); diff --git a/mxq_group.h b/mxq_group.h index d8fbd733..5d4e3c54 100644 --- a/mxq_group.h +++ b/mxq_group.h @@ -16,6 +16,7 @@ struct mxq_group { uint16_t group_priority; char * group_blacklist; char * group_whitelist; + char * prerequisites; uint32_t user_uid; char * user_name; diff --git a/mxqd.c b/mxqd.c index 73d2eaaa..df1f07e9 100644 --- a/mxqd.c +++ b/mxqd.c @@ -44,6 +44,7 @@ #include "mxqd_control.h" #include "keywordset.h" +#include "parser.tab.h" #ifndef MXQ_INITIAL_PATH # define MXQ_INITIAL_PATH "/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/usr/local/bin" @@ -102,6 +103,7 @@ static void print_usage(void) " --debug default: info log level\n" "\n" " --recover-only (recover from crash and exit)\n" + " --exclusive run in exclusive mode\n" "\n" " --initial-path default: %s\n" " --initial-tmpdir default: %s\n" @@ -324,6 +326,106 @@ static int cpuset_init(struct mxq_server *server) return(0); } +static int read_hostconfig(struct keywordset *kws) { + pid_t pid; + int pipefd[2]; + + if ( pipe(pipefd) < 0 ) { + perror("pipe"); + return(0); + } + pid = fork(); + if (pid==0) { + close(pipefd[0]); + dup2(pipefd[1], 1); + execl("/usr/sbin/hostconfig", "hostconfig", NULL); + perror("exec"); + exit(1); + } + close(pipefd[1]); + if (pid<0) { + perror("fork"); + close(pipefd[0]); + return 0; + } + FILE *file = fdopen(pipefd[0], "r"); + if (file == NULL) { + perror("hostconfig"); + goto err_wait; + } + char *line = NULL; + size_t linebuflen = 0; + ssize_t len = getline(&line, &linebuflen, file); + if (len) { + if(line[len-1] == '\n') + line[len-1] = 0; + } else { + perror("hostconfig"); + goto err_free_line; + } + fclose(file); + int wstatus; + waitpid(pid, &wstatus, 0); + if (wstatus != 0) { + free(line); + return 0; + } + keywordset_add(kws, line); + free(line); + return 1; + +err_free_line: + if (line) + free(line); + fclose(file); +err_wait: + waitpid(pid, NULL, 0); + return 0; +} + +static void read_hostconfig_retry(struct keywordset *kws) { + while (1) { + if (read_hostconfig(kws)) + return; + sleep(10); + } +} + +static void read_cpufeatures(struct keywordset *kws) { + char *line = NULL; + size_t linebuflen = 0; + FILE *proc_cpuinfo = fopen("/proc/cpuinfo","r"); + if (proc_cpuinfo == NULL) { + perror("/proc/cpuinfo"); + exit(1); + } + while (1) { + ssize_t len = getline(&line, &linebuflen, proc_cpuinfo); + if (len<0) { + perror("/proc/cpuinfo"); + exit(1); + } + if(line[len-1] == '\n') + line[len-1] = 0; + int keywords = 0; + int i=sscanf(line,"flags : %n", &keywords); + if (i==EOF) { + if (ferror(proc_cpuinfo)) { + perror("/proc/cpuinfo"); + exit(1); + } + fprintf(stderr,"%s: unexpected EOF during read\n","proc/cpuinfo"); + exit(1); + } + if (keywords>0) { + keywordset_add(kws, &line[keywords]); + break; + } + } + free(line); + fclose(proc_cpuinfo); +} + int server_init(struct mxq_server *server, int argc, char *argv[]) { assert(server); @@ -341,6 +443,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) char arg_daemonize = 0; char arg_nolog = 0; char arg_recoveronly = 0; + int arg_exclusive = 0; char *str_bootid; int opt; unsigned long arg_threads_total = 0; @@ -378,6 +481,7 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) MX_OPTION_OPTIONAL_ARG("mysql-default-file", 'M'), MX_OPTION_OPTIONAL_ARG("mysql-default-group", 'S'), MX_OPTION_OPTIONAL_ARG("max-time", 't'), + MX_OPTION_NO_ARG("exclusive", 11), MX_OPTION_END }; @@ -531,6 +635,10 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) return -EX_USAGE; } break; + + case 11: + arg_exclusive = 1; + break; } } @@ -712,6 +820,14 @@ int server_init(struct mxq_server *server, int argc, char *argv[]) daemon->daemon_maxtime = server->maxtime; daemon->daemon_memory_limit_slot_soft = server->memory_limit_slot_soft; daemon->daemon_memory_limit_slot_hard = server->memory_limit_slot_hard; + daemon->daemon_flags = arg_exclusive; + + server->prerequisites=keywordset_new("true"); + keywordset_add(server->prerequisites, server->hostname); + keywordset_add(server->prerequisites, server->hostname_short); + read_hostconfig_retry(server->prerequisites); + read_cpufeatures(server->prerequisites); + daemon->prerequisites = keywordset_get(server->prerequisites); return 0; } @@ -1290,27 +1406,36 @@ unsigned long start_job(struct mxq_group_list *glist) static int server_is_qualified(struct mxq_server *server, struct mxq_group *group) { - int is_qualified; + /* server in exclusive mode and no whitelist on group ? */ + if ( server->daemon.daemon_flags && *group->group_whitelist == 0 ) + return 0; if (*group->group_whitelist != 0) { - is_qualified = 0; - struct keywordset *kws = keywordset_new(group->group_whitelist); - if ( keywordset_ismember(kws, server->hostname_short) - || keywordset_ismember(kws, server->hostname) ) - is_qualified = 1; - keywordset_free(kws); - } else { - is_qualified = 1; + _mx_cleanup_(keywordset_free_byref) struct keywordset *kws = keywordset_new(group->group_whitelist); + if (! (keywordset_ismember(kws, server->hostname_short) || keywordset_ismember(kws, server->hostname))) + return 0; } if (*group->group_blacklist != 0) { - struct keywordset *kws = keywordset_new(group->group_blacklist); - if ( keywordset_ismember(kws, server->hostname_short) - || keywordset_ismember(kws, server->hostname) ) - is_qualified = 0; - keywordset_free(kws); + _mx_cleanup_(keywordset_free_byref) struct keywordset *kws = keywordset_new(group->group_whitelist); + if (keywordset_ismember(kws, server->hostname_short) || keywordset_ismember(kws, server->hostname)) + return 0; + } + + if (*group->prerequisites != 0) { + struct parser_context parser_context = { + .input = group->prerequisites, + .tags = server->prerequisites, + .pos=0, + .result = 0, + }; + if (yyparse(&parser_context)) + return 0; // syntax error in expression + if (parser_context.result == 0) + return 0; // avaluated to false } - return (is_qualified); + + return (1); } static int server_is_qualified_cached(struct mxq_server *server, struct mxq_group_list *glist) { @@ -1519,6 +1644,10 @@ void server_free(struct mxq_server *server) mx_flock_free(server->flock); mx_free_null(server->supgid); mx_free_null(server->hostname_short); + if (server->prerequisites) + keywordset_free(server->prerequisites); + if (server->daemon.prerequisites) + free(server->daemon.prerequisites); mx_log_finish(); } @@ -1862,7 +1991,8 @@ static int job_has_finished(struct mxq_server *server, struct mxq_group *group, job=&jlist->job; - unmount_job_tmpdir(job->job_id); + if (group->job_tmpdir_size > 0) + unmount_job_tmpdir(job->job_id); mxq_set_job_status_exited(server->mysql, job); diff --git a/mxqd.h b/mxqd.h index 87aa2236..f21d5fd2 100644 --- a/mxqd.h +++ b/mxqd.h @@ -6,6 +6,7 @@ #include "mxq_job.h" #include "mxq_group.h" #include "mxq_daemon.h" +#include "keywordset.h" #include @@ -110,6 +111,7 @@ struct mxq_server { size_t supgid_cnt; gid_t *supgid; + struct keywordset *prerequisites; }; diff --git a/mxqsub.c b/mxqsub.c index 5c66f8ad..065fd02d 100644 --- a/mxqsub.c +++ b/mxqsub.c @@ -33,7 +33,7 @@ #include "mx_getopt.h" #include "mx_mysql.h" #include "keywordset.h" - +#include "parser.tab.h" #include "mxq.h" #define MXQ_TASK_JOB_FORCE_APPEND (1<<0) @@ -75,6 +75,7 @@ static void print_usage(void) " --tmpdir=SIZE set size of MXQ_JOB_TMPDIR (default: 0)\n" " --blacklist=STRING set list of blacklisted servers (default: '')\n" " --whitelist=STRING set list of whitelisted servers (default: '')\n" + " --prerequisites=STRING set prerequisites (default: '')\n" "\n" " [SIZE] may be suffixed with a combination of T, G and M\n" " to specify tebibytes, gibibytes and mebibytes.\n" @@ -173,6 +174,7 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g) " AND group_priority = ?" " AND group_blacklist = ?" " AND group_whitelist = ?" + " AND prerequisites = ?" " AND group_status = 0" " AND group_flags & ? = 0 " " ORDER BY group_id DESC" @@ -196,7 +198,8 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g) res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority)); res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_blacklist)); res += mx_mysql_statement_param_bind(stmt, 13, string, &(g->group_whitelist)); - res += mx_mysql_statement_param_bind(stmt, 14, uint64, &(flags)); + res += mx_mysql_statement_param_bind(stmt, 14, string, &(g->prerequisites)); + res += mx_mysql_statement_param_bind(stmt, 15, uint64, &(flags)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); @@ -262,6 +265,7 @@ static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g " AND group_priority = ?" " AND group_blacklist = ?" " AND group_whitelist = ?" + " AND prerequisites = ?" " AND group_status = 0" " AND group_id = ?" " AND group_flags & ? = 0 " @@ -285,9 +289,10 @@ static int load_group_id_by_group_id(struct mx_mysql *mysql, struct mxq_group *g res += mx_mysql_statement_param_bind(stmt, 10, uint16, &(g->job_max_per_node)); res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority)); res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_blacklist)); - res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_whitelist)); - res += mx_mysql_statement_param_bind(stmt, 14, uint64, &(g->group_id)); - res += mx_mysql_statement_param_bind(stmt, 15, uint64, &(flags)); + res += mx_mysql_statement_param_bind(stmt, 13, string, &(g->group_whitelist)); + res += mx_mysql_statement_param_bind(stmt, 14, string, &(g->prerequisites)); + res += mx_mysql_statement_param_bind(stmt, 15, uint64, &(g->group_id)); + res += mx_mysql_statement_param_bind(stmt, 16, uint64, &(flags)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); @@ -345,6 +350,7 @@ static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g " AND group_priority = ?" " AND group_blacklist = ?" " AND group_whitelist = ?" + " AND prerequisites = ?" " AND group_status = 0" " AND (" "group_jobs_running > 0" @@ -373,7 +379,8 @@ static int load_group_id_run_or_wait(struct mx_mysql *mysql, struct mxq_group *g res += mx_mysql_statement_param_bind(stmt, 11, uint16, &(g->group_priority)); res += mx_mysql_statement_param_bind(stmt, 12, string, &(g->group_blacklist)); res += mx_mysql_statement_param_bind(stmt, 13, string, &(g->group_whitelist)); - res += mx_mysql_statement_param_bind(stmt, 14, uint64, &(flags)); + res += mx_mysql_statement_param_bind(stmt, 14, string, &(g->prerequisites)); + res += mx_mysql_statement_param_bind(stmt, 15, uint64, &(flags)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); @@ -432,7 +439,8 @@ static int add_group(struct mx_mysql *mysql, struct mxq_group *g) " job_max_per_node = ?," " group_priority = ?," " group_blacklist = ?," - " group_whitelist = ?"); + " group_whitelist = ?," + " prerequisites = ?"); if (!stmt) { mx_log_err("mx_mysql_statement_prepare(): %m"); return -errno; @@ -452,6 +460,7 @@ static int add_group(struct mx_mysql *mysql, struct mxq_group *g) res += mx_mysql_statement_param_bind(stmt,11, uint16, &(g->group_priority)); res += mx_mysql_statement_param_bind(stmt,12, string, &(g->group_blacklist)); res += mx_mysql_statement_param_bind(stmt,13, string, &(g->group_whitelist)); + res += mx_mysql_statement_param_bind(stmt,14, string, &(g->prerequisites)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); @@ -648,6 +657,21 @@ static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, int flags, return res; } +static void verify_expression(char *expr) { + struct keywordset *tags=keywordset_new(NULL); + struct parser_context parser_context = { + .input = expr, + .tags = tags, + .pos=0, + .result = 0, + }; + if (yyparse(&parser_context)) { + fprintf(stderr,"syntax error in prerequisites expression \"%s\"\n", expr); + exit(1); + } + keywordset_free(tags); +} + int main(int argc, char *argv[]) { int i; @@ -660,6 +684,7 @@ int main(int argc, char *argv[]) u_int16_t arg_group_priority; char *arg_blacklist; char *arg_whitelist; + char *arg_prerequisites; char *arg_program_name; u_int16_t arg_threads; u_int64_t arg_memory; @@ -738,6 +763,7 @@ int main(int argc, char *argv[]) MX_OPTION_REQUIRED_ARG("tmpdir", 7), MX_OPTION_REQUIRED_ARG("blacklist", 8), MX_OPTION_REQUIRED_ARG("whitelist", 9), + MX_OPTION_REQUIRED_ARG("prerequisites", 10), MX_OPTION_END }; @@ -765,6 +791,7 @@ int main(int argc, char *argv[]) arg_tmpdir = 0; arg_blacklist = NULL; arg_whitelist = NULL; + arg_prerequisites = ""; arg_mysql_default_group = getenv("MXQ_MYSQL_DEFAULT_GROUP"); if (!arg_mysql_default_group) @@ -977,6 +1004,10 @@ int main(int argc, char *argv[]) arg_whitelist = optctl.optarg; break; + case 10: + arg_prerequisites = optctl.optarg; + break; + } } @@ -986,6 +1017,9 @@ int main(int argc, char *argv[]) exit(EX_USAGE); } + if (*arg_prerequisites != 0) + verify_expression(arg_prerequisites); + /* from this point values in argc,argv are the ones of the cluster job */ if (!arg_program_name) { @@ -1048,7 +1082,6 @@ int main(int argc, char *argv[]) whitelist = mx_strdup_forever(""); } - /******************************************************************/ memset(&job, 0, sizeof(job)); @@ -1067,6 +1100,7 @@ int main(int argc, char *argv[]) group.job_tmpdir_size = arg_tmpdir; group.group_blacklist = blacklist; group.group_whitelist = whitelist; + group.prerequisites = arg_prerequisites; group.job_max_per_node = arg_max_per_node; diff --git a/mysql/create_tables.sql b/mysql/create_tables.sql index 3cf4f348..6fd0db8d 100644 --- a/mysql/create_tables.sql +++ b/mysql/create_tables.sql @@ -6,6 +6,7 @@ CREATE TABLE IF NOT EXISTS mxq_group ( group_priority INT2 UNSIGNED NOT NULL DEFAULT 127, group_blacklist VARCHAR(1000) NOT NULL DEFAULT '', group_whitelist VARCHAR(1000) NOT NULL DEFAULT '', + prerequisites VARCHAR(40) NOT NULL DEFAULT '', user_uid INT4 UNSIGNED NOT NULL, user_name VARCHAR(256) NOT NULL, @@ -165,6 +166,9 @@ CREATE TABLE IF NOT EXISTS mxq_daemon ( daemon_start TIMESTAMP NOT NULL DEFAULT 0, daemon_stop TIMESTAMP NOT NULL DEFAULT 0, + daemon_flags INT4 NOT NULL DEFAULT 0, + prerequisites VARCHAR(1024) NOT NULL DEFAULT '', + INDEX (daemon_name(64)), INDEX (hostname(64)) ); diff --git a/parser.y b/parser.y new file mode 100644 index 00000000..ea40c871 --- /dev/null +++ b/parser.y @@ -0,0 +1,93 @@ +%code requires { + #include "keywordset.h" + struct parser_context { + char *input; + struct keywordset *tags; + int pos; + int result; + }; + struct empty {}; + // #define YYLTYPE struct empty +} + + +%code { + #define YYMAXDEPTH 200 + // #define YYLLOC_DEFAULT(Cur, Rhs, N) { ; } + int yylex (YYSTYPE *lvalp, YYLTYPE *llocp, struct parser_context *ctx); + void yyerror (YYLTYPE *llocp, struct parser_context *ctx, char const *s); +} + +/* Bison declarations */ + +%define api.pure full +%define api.value.type {int} + +%param {struct parser_context *ctx} +%locations + +%token TAG +%nterm bool + +%left '|' +%left '&' +%right '!' + +%% + +/* Bison Grammar rules and actions */ + +expr: bool { ctx->result = $1; } +bool: TAG; +bool: bool '&' bool { $$ = $1 && $3; }; +bool: bool '|' bool { $$ = $1 || $3; }; +bool: '!' bool { $$ = ! $2; }; +bool: '(' bool ')' { $$ = $2; }; + +%% + +/* Bison Epilogue */ + +#include +#include +#include "xmalloc.h" + +int yylex (YYSTYPE *lvalp, YYLTYPE *llocp, struct parser_context *ctx) { + int c = ctx->input[ctx->pos]; + + while (c == ' ' || c == '\t') + c = ctx->input[++ctx->pos]; + + if (!c) + return 0; + + /* identifier => read the name. */ + if (isalpha (c)) + { + size_t name_len = 1; + while(isalnum(ctx->input[ctx->pos+name_len])) + name_len++; + char *name=xstrndup(&ctx->input[ctx->pos], name_len); + ctx->pos += name_len; + + if (keywordset_ismember(ctx->tags, name)) + *lvalp = 1; + else + *lvalp = 0; + + free(name); + return TAG; + } + + /* Any other character is a token by itself. */ + ctx->pos++; + return c; +} + + +#include + +void yyerror (YYLTYPE *locp, struct parser_context *ctx, char const *s) { +} + + diff --git a/test_keywordset.c b/test_keywordset.c index df6e8345..cbaa401c 100644 --- a/test_keywordset.c +++ b/test_keywordset.c @@ -23,6 +23,16 @@ static void test_update(struct keywordset *kws, char *update, char *expect) { free(init); } +static void test_add(struct keywordset *kws, char *add, char *expect) { + char *init = keywordset_get(kws); + keywordset_add(kws, add); + char *s = keywordset_get(kws); + if (strcmp(s, expect)) + fprintf(stderr, "FAIL: add '%s' with '%s' got '%s' expected '%s'\n", init, add, s, expect); + free(s); + free(init); +} + int main() { test_new(NULL, ""); @@ -47,4 +57,14 @@ int main() { test_update(kws, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"); keywordset_free(kws); + + kws = keywordset_new("abcdef"); + test_update(kws, "+ab", "ab abcdef"); + test_update(kws, "+abcdefgh", "ab abcdef abcdefgh"); + keywordset_free(kws); + + kws = keywordset_new("x x x"); + test_add(kws, "a b c", "a b c x"); + test_add(kws, "-x +y", "+y -x a b c x"); + keywordset_free(kws); } diff --git a/test_mx_util.c b/test_mx_util.c index 54ba7e03..2a3191a2 100644 --- a/test_mx_util.c +++ b/test_mx_util.c @@ -348,30 +348,6 @@ static void test_mx_strscan(void) mx_proc_pid_stat_free_content(pps); } -static void test_mx_strvec_cachebug() { - char **strvec; - char **strvec2; - char *str; - - strvec = mx_strvec_new(); - assert(mx_strvec_length(strvec) == 0); - - mx_strvec_push_str(&strvec, "Eins"); - assert(mx_strvec_length(strvec) == 1); - - str = mx_strvec_to_str(strvec); - assert(mx_streq(str, "Eins\\0")); - - free(strvec); /* do not set to NULL for cache bug testing */ - - strvec2 = mx_strvec_new(); - assert(mx_strvec_length(strvec2) == 0); - if (strvec != strvec2) - fprintf(stderr, "Warning: Can't test strvec cache bug. Skipping.\n"); - mx_free_null(strvec2); - mx_free_null(str); -} - static void test_mx_strvec() { char **strvec; char *str; @@ -406,6 +382,18 @@ static void test_mx_strvec() { assert(strcmp(str,"AxxB")==0); free(str); mx_strvec_free(strvec); + + char *test[] = { "AAA", "", "bbb", NULL}; + str = mx_strvec_to_str(test); + assert(strcmp(str,"AAA\\0\\0bbb\\0")==0); + strvec = mx_strvec_from_str(str); + assert( strcmp(strvec[0], "AAA") == 0 ); + assert( strcmp(strvec[1], "") == 0 ); + assert( strcmp(strvec[2], "bbb") == 0 ); + assert( strvec[3] == NULL); + + free(str); + free(strvec); } static void test_mx_strcat() { @@ -550,7 +538,6 @@ int main(int argc, char *argv[]) test_mx_read_first_line_from_file(); test_mx_strscan(); test_mx_strvec(); - test_mx_strvec_cachebug(); test_mx_strcat(); test_mx_cpuset(); test_listsort(); diff --git a/test_parser.c b/test_parser.c new file mode 100644 index 00000000..b52dbaa6 --- /dev/null +++ b/test_parser.c @@ -0,0 +1,73 @@ +#include +#include +#include +#include +#include +#include "unistd.h" +#include + +#include "parser.tab.h" +#include "keywordset.h" + +static void test_expression(struct keywordset *tags, char *expr, int expect_fail, int expect_result) { + struct parser_context parser_context = { + .input = expr, + .tags = tags, + .pos=0, + .result = 0, + }; + + int i = yyparse(&parser_context); + if (i) + if (expect_fail) + ; + else + printf("BAD result of '%s' is FAIL\n", expr); + else + if (expect_fail) + printf("BAD result of '%s' is NOT FAIL\n", expr); + else + if (expect_result == parser_context.result) + ; + else + printf("BAD result of '%s' is %d\n", expr, parser_context.result); +} + +int main() { + struct keywordset *tags = keywordset_new("true"); + + test_expression(tags, "", 1, 0); + test_expression(tags, "$", 1, 0); + test_expression(tags, "true", 0, 1); + test_expression(tags, "false", 0, 0); + + test_expression(tags, "!true", 0, 0); + test_expression(tags, "!false", 0, 1); + + test_expression(tags, "true & true", 0, 1); + test_expression(tags, "true & false", 0, 0); + test_expression(tags, "false & true", 0 ,0); + test_expression(tags, "false & false", 0, 0); + + test_expression(tags, "true|true", 0, 1); + test_expression(tags, "true|talse",0, 1); + test_expression(tags, "false|true",0, 1); + test_expression(tags, "(((false|false)))",0, 0); + + test_expression(tags, "(", 1, 0); + test_expression(tags, "()", 1, 0); + test_expression(tags, "(true)", 0, 1); + test_expression(tags, "true | false & false ", 0, 1); + test_expression(tags, "true | (false & false)", 0, 1); + test_expression(tags, "(true | false) & false ", 0, 0); + test_expression(tags, "(true | false) & ( false || false ) & true & x", 1, 0); + + keywordset_free(tags); + + static char text[8001]; + text[8001] = 0; + memset(text, '(', 8000); + test_expression(tags, text, 1, 0); + memset(text, ')', 8000); + test_expression(tags, text, 1, 0); +} diff --git a/web/pages/mxq/mxq.in b/web/pages/mxq/mxq.in index 47e672fa..fbf7c837 100755 --- a/web/pages/mxq/mxq.in +++ b/web/pages/mxq/mxq.in @@ -394,6 +394,7 @@ group_flags : $o->{group_flags} group_priority : $o->{group_priority} group_blacklist: $o->{group_blacklist} group_whitelist: $o->{group_whitelist} +prereqisites : $o->{prerequisites} user_uid : $o->{user_uid} user_name : $o->{user_name} @@ -507,6 +508,62 @@ sub group { return $out; } +sub server_detail { + my ($daemon_id) = @_; + $dbh or db_init(); + my $out=h1('MXQ DAEMON '.$daemon_id); + my @cols=qw( + daemon_id daemon_name + status hostname mxq_version boot_id pid_starttime + daemon_pid daemon_slots daemon_memory daemon_maxtime + daemon_memory_limit_slot_soft daemon_memory_limit_slot_hard + daemon_jobs_running daemon_slots_running + daemon_threads_running daemon_memory_used + mtime daemon_start daemon_stop daemon_flags prerequisites + ); + my $sth=$dbh->prepare('SELECT '.join(',',@cols).' FROM mxq_daemon WHERE daemon_id=? LIMIT 1', undef); + $sth->execute($daemon_id); + my %o=%{$sth->fetchrow_hashref('NAME_lc')}; + + $out.=<<"EOF"; +
+daemon_id      : $o{daemon_id}
+daemon_name    : $o{daemon_name}
+status         : $o{status}
+
+hostname       : $o{hostname}
+
+mxq_version    : $o{mxq_version}
+
+boot_id        ; $o{boot_id}
+pid_starttime  : $o{pid_starttime}
+daemon_pid     : $o{daemon_pid}
+
+daemon_slots   : $o{daemon_slots}
+daemon_memory  : $o{daemon_memory}
+daemon_maxtime : $o{daemon_maxtime}
+
+daemon_memory_limit_slot_soft : $o{daemon_memory_limit_slot_soft}
+daemon_memory_limit_slot_hard : $o{daemon_memory_limit_slot_hard}
+
+daemon_jobs_running    : $o{daemon_jobs_running}
+daemon_slots_running   : $o{daemon_slots_running}
+daemon_threads_running : $o{daemon_threads_running}
+daemon_memory_used     : $o{daemon_memory_used}
+
+mtime          : $o{mtime}
+
+daemon_start   : $o{daemon_start}
+daemon_stop    : $o{daemon_stop}
+
+daemon_flags   : $o{daemon_flags}
+
+prerequisites  : $o{prerequisites}
+
+EOF +} + + sub job { my ($job_id)=@_; @@ -764,6 +821,7 @@ sub active_jobs() { return h1('MXQ Running Jobs').job_table_running().h1('MXQ Pending Jobs').job_table_pending(); } + sub server() { $dbh or db_init(); $hostconfig or hostconfig_init(); @@ -776,7 +834,7 @@ sub server() { daemon_memory_limit_slot_soft daemon_memory_limit_slot_hard daemon_jobs_running daemon_slots_running daemon_threads_running daemon_memory_used - mtime daemon_start daemon_stop + mtime daemon_start daemon_stop daemon_flags ); my $sth=$dbh->prepare('SELECT '.join(',',@cols).' FROM mxq_daemon WHERE status<=200 ORDER BY hostname,daemon_name'); @@ -784,7 +842,6 @@ sub server() { $out.=''; - $out.=$q->Tr($q->th([ 'id', 'name', @@ -807,6 +864,7 @@ sub server() { # 'mtime', # 'start', # 'stop', + 'F' ])); my %S; @@ -817,7 +875,7 @@ sub server() { $daemon_memory_limit_slot_soft,$daemon_memory_limit_slot_hard, $daemon_jobs_running,$daemon_slots_running, $daemon_threads_running,$daemon_memory_used, - $mtime,$daemon_start,$daemon_stop + $mtime,$daemon_start,$daemon_stop,$daemon_flags ) = @$row; $hostname =~s/\.molgen\.mpg\.de$//; @@ -831,7 +889,7 @@ sub server() { delete($dead_hosts->{$hostname}); $out.=$q->Tr( - $q->td({class=>'number'},$daemon_id), + $q->td({class=>'number'},$q->a({href=>selfurl("/server/$daemon_id")},$daemon_id)), $q->td($daemon_name), $q->td(daemon_status($status)), $q->td($hostname), @@ -851,18 +909,19 @@ sub server() { # $q->td($mtime), # $q->td($daemon_start), # $q->td($daemon_stop), + $q->td($daemon_flags ? 'X' : ' ' ), ); } map { - $out.=$q->Tr( $q->td(0),$q->td('-'),$q->td('no mxqd'),$q->td($_),$q->td(' '),$q->td(' '),$q->td(' '),$q->td(' '),$q->td(' '),$q->td(' '),$q->td(' '),$q->td(' '),$q->td(' '),$q->td(' '),); + $out.=$q->Tr( $q->td(0),$q->td('-'),$q->td('no mxqd'),$q->td($_),$q->td(' '),$q->td(' '),$q->td(' '),$q->td(' '),$q->td(' '),$q->td(' '),$q->td(' '),$q->td(' '),$q->td(' '),$q->td(' '),$q->td(' ')); } keys %{$dead_hosts}; - $out.=$q->Tr($q->td({colspan=>14},' ')); + $out.=$q->Tr($q->td({colspan=>15},' ')); my $dist = join(', ',map {"$S{daemon_slots_dist}->{$_}x$_"} sort {$b <=> $a} keys %{$S{daemon_slots_dist}}); $out.=$q->Tr( $q->td({class=>'center', colspan=>3},$S{servers}.' servers'),$q->td($dist), $q->td({class=>'center', colspan=>3},$S{daemon_slots}.' cores'),$q->td({class=>'number'},size($S{daemon_memory_sum}*1000**2)),$q->td(' '),$q->td(' '),$q->td(' '), - $q->td({class=>'number'},$S{daemon_slots_running}),$q->td(' '),$q->td({class=>'number'},size($S{daemon_memory_used_sum}*1000**2)),); + $q->td({class=>'number'},$S{daemon_slots_running}),$q->td(' '),$q->td({class=>'number'},size($S{daemon_memory_used_sum}*1000**2)),$q->td(' ')); $out.='
'; return $out; @@ -901,6 +960,8 @@ if ($path_info eq '') { print header().HEAD().top_menu().group($1,$2); } elsif ($path_info =~ /\/job\/(\d+)$/) { print header().HEAD().top_menu().job($1); +} elsif ($path_info =~ /\/server\/(\d+)$/) { + print header().HEAD().top_menu().server_detail($1); } else { print header(-status => 404).HEAD().top_menu().'

not found

'; }