diff --git a/mxq.h b/mxq.h index aac59fe1..418331d2 100644 --- a/mxq.h +++ b/mxq.h @@ -52,6 +52,14 @@ # define MXQ_LOGDIR "/var/log" #endif +#ifndef MXQ_MAX_ACTIVE_GROUPS_PER_USER +# define MXQ_MAX_ACTIVE_GROUPS_PER_USER 100 +#endif + +#ifndef MXQ_MAX_PENDING_JOBS_PER_GROUP +# define MXQ_MAX_PENDING_JOBS_PER_GROUP 10000 +#endif + static void mxq_print_generic_version(void) { printf( diff --git a/mxqsub.c b/mxqsub.c index 4624f9e9..07322e61 100644 --- a/mxqsub.c +++ b/mxqsub.c @@ -152,7 +152,8 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g) stmt = mx_mysql_statement_prepare(mysql, "SELECT" - " group_id" + " group_id," + " group_jobs_inq" " FROM mxq_group " " WHERE group_name = ?" " AND user_uid = ?" @@ -198,6 +199,7 @@ static int load_group_id(struct mx_mysql *mysql, struct mxq_group *g) if (num_rows) { mx_mysql_statement_result_bind(stmt, 0, uint64, &(g->group_id)); + mx_mysql_statement_result_bind(stmt, 1, uint64, &(g->group_jobs_inq)); res = mx_mysql_statement_fetch(stmt); if (res < 0) { @@ -513,6 +515,43 @@ static int add_job(struct mx_mysql *mysql, struct mxq_job *j) return (int)num_rows; } +static int get_active_groups_for_user(struct mx_mysql *mysql, char *username) +{ + struct mx_mysql_stmt *stmt; + int res; + unsigned long long num_rows; + uint64_t count; + + stmt = mx_mysql_statement_prepare(mysql, + "SELECT" + " count(*)" + " FROM mxq_group" + " WHERE user_name = ?" + " AND (group_jobs_inq > 0 OR group_jobs_running > 0)"); + if (!stmt) { + mx_log_err("mx_mysql_statement_prepare(): %m"); + return -errno; + } + mx_mysql_statement_param_bind(stmt, 0, string, &username); + res = mx_mysql_statement_execute(stmt, &num_rows); + if (res < 0) { + mx_log_err("mx_mysql_statement_execute(): %m"); + mx_mysql_statement_close(&stmt); + return res; + } + assert(num_rows == 1); + mx_mysql_statement_result_bind(stmt, 0, uint64, &count); + res = mx_mysql_statement_fetch(stmt); + if (res < 0) { + mx_log_err("mx_mysql_statement_fetch(): %m"); + mx_mysql_statement_close(&stmt); + return res; + } + mx_mysql_statement_close(&stmt); + assert(countuser_name)>=MXQ_MAX_ACTIVE_GROUPS_PER_USER) { + mx_log_err("Limit of %d active groups reached for user %s",MXQ_MAX_ACTIVE_GROUPS_PER_USER,g->user_name); + return -(errno=EDQUOT); + } res = add_group(mysql, g); if (res < 0) return res; @@ -549,6 +592,11 @@ static int mxq_submit_task(struct mx_mysql *mysql, struct mxq_job *j, int flags, mx_log_info("The new job will be added to new group with group_id=%lu", g->group_id); } else { + mx_log_debug("pending: %lu",g->group_jobs_inq); + if (g->group_jobs_inq >= MXQ_MAX_PENDING_JOBS_PER_GROUP) { + mx_log_err("Limit of %d pending jobs per group reached for group_id %lu",MXQ_MAX_PENDING_JOBS_PER_GROUP,g->group_id); + return -(errno=EDQUOT); + } mx_log_info("The new job will be attached to existing group with group_id=%lu", g->group_id); } @@ -999,7 +1047,7 @@ int main(int argc, char *argv[]) mx_log_info("MySQL: Connection to database closed."); if (res < 0) { - if (res != -ENOENT) + if (res != -ENOENT && res != -EDQUOT) mx_log_err("Job submission failed: %m"); return 1; }