Skip to content
Navigation Menu
Toggle navigation
Sign in
In this repository
All GitHub Enterprise
↵
Jump to
↵
No suggested jump to results
In this repository
All GitHub Enterprise
↵
Jump to
↵
In this user
All GitHub Enterprise
↵
Jump to
↵
In this repository
All GitHub Enterprise
↵
Jump to
↵
Sign in
Reseting focus
You signed in with another tab or window.
Reload
to refresh your session.
You signed out in another tab or window.
Reload
to refresh your session.
You switched accounts on another tab or window.
Reload
to refresh your session.
Dismiss alert
{{ message }}
mariux
/
mxq
Public
forked from
mariux64/mxq
Notifications
You must be signed in to change notification settings
Fork
0
Star
0
Code
Pull requests
0
Actions
Projects
0
Security
Insights
Issues
Additional navigation options
Code
Pull requests
Actions
Projects
Security
Insights
Issues
Files
issues/issue15
manpages
mysql
web
.gitignore
LICENSE
Makefile
README.md
mx_flock.c
mx_flock.h
mx_getopt.c
mx_getopt.h
mx_log.c
mx_log.h
mx_mysql.c
mx_mysql.h
mx_util.c
mx_util.h
mxq.h
mxq_group.c
mxq_group.h
mxq_job.c
mxq_job.h
mxq_log.c
mxqadmin.c
mxqd.c
mxqd.h
mxqdctl-hostconfig.sh
mxqdump.c
mxqkill.c
mxqsub.c
os-release
test.c
test_mx_log.c
test_mx_mysql.c
test_mx_util.c
Breadcrumbs
mxq
/
mxqkill.c
Blame
Blame
Latest commit
mariux
mxqkill: Fix style in print_usage()
Aug 26, 2015
f54bf6e
·
Aug 26, 2015
History
History
510 lines (403 loc) · 14.7 KB
Breadcrumbs
mxq
/
mxqkill.c
Top
File metadata and controls
Code
Blame
510 lines (403 loc) · 14.7 KB
Raw
#define _GNU_SOURCE #include <stdio.h> #include <stdint.h> #include <errno.h> #include <assert.h> #include <unistd.h> #include <sys/types.h> #include <pwd.h> #include <assert.h> #include <sysexits.h> #include <ctype.h> #include <mysql.h> #include <string.h> #include "mx_log.h" #include "mx_util.h" #include "mx_mysql.h" #include "mx_getopt.h" #include "mxq_group.h" #include "mxq_job.h" #include "mxq.h" #define UINT64_UNSET (uint64_t)(-1) #define UINT64_ALL (uint64_t)(-2) #define UINT64_SPECIAL_MIN (uint64_t)(-2) #define UINT64_HASVALUE(x) ((x) < UINT64_SPECIAL_MIN) static void print_usage(void) { mxq_print_generic_version(); printf( "\n" "Usage:\n" " %s [options]\n" " %s [options] --group-id=GROUPID\n" "\n" "Synopsis:\n" " kill/cancel jobs running in MXQ cluster\n" "\n" "options:\n" "\n" " -g, --group-id=GROUPID cancel/kill group <GROUPID>\n" " -J, --job-id=JOBID cancel job <JOBID>\n" " -u, --user=NAME|UID cancel group for user. (root only)\n" "\n" " -v, --verbose be more verbose\n" " --debug set debug log level (default: warning log level)\n" "\n" " -V, --version\n" " -h, --help\n" "\n" "Change how to connect to the mysql server:\n" "\n" " -M, --mysql-default-file[=MYSQLCNF] (default: %s)\n" " -S, --mysql-default-group[=MYSQLGROUP] (default: %s)\n" "\n" "Environment:\n" " MXQ_MYSQL_DEFAULT_FILE change default for MYSQLCNF\n" " MXQ_MYSQL_DEFAULT_GROUP change default for MYSQLGROUP\n" "\n", program_invocation_short_name, program_invocation_short_name, MXQ_MYSQL_DEFAULT_FILE_STR, MXQ_MYSQL_DEFAULT_GROUP_STR ); } static int update_group_status_cancelled(struct mx_mysql *mysql, struct mxq_group *g) { struct mx_mysql_stmt *stmt = NULL; unsigned long long num_rows = 0; int res; assert(g->group_id); stmt = mx_mysql_statement_prepare(mysql, "UPDATE mxq_group SET" " group_status = " status_str(MXQ_GROUP_STATUS_CANCELLED) " WHERE group_id = ?" " AND group_status = " status_str(MXQ_GROUP_STATUS_OK) " AND user_uid = ?" " AND group_jobs-group_jobs_finished-group_jobs_failed-group_jobs_cancelled-group_jobs_unknown > 0" ); if (!stmt) { mx_log_err("mx_mysql_statement_prepare(): %m"); return -(errno=EIO); } res = mx_mysql_statement_param_bind(stmt, 0, uint64, &(g->group_id)); res += mx_mysql_statement_param_bind(stmt, 1, uint32, &(g->user_uid)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); if (res < 0) mx_log_err("mx_mysql_statement_execute(): %m"); mx_mysql_statement_close(&stmt); if (res < 0) return -(errno=-res); assert(num_rows <= 1); return (int)num_rows; } static int update_job_status_cancelled_by_group(struct mx_mysql *mysql, struct mxq_group *g) { struct mx_mysql_stmt *stmt = NULL; unsigned long long num_rows = 0; int res; assert(g->group_id); stmt = mx_mysql_statement_prepare(mysql, "UPDATE mxq_job SET" " job_status = " status_str(MXQ_JOB_STATUS_CANCELLED) " WHERE group_id = ?" " AND job_status IN (" status_str(MXQ_JOB_STATUS_INQ) "," status_str(MXQ_JOB_STATUS_ASSIGNED) ")" " AND host_hostname = ''" " AND server_id = ''" " AND host_pid = 0" ); if (!stmt) { mx_log_err("mx_mysql_statement_prepare(): %m"); return -(errno=EIO); } res = mx_mysql_statement_param_bind(stmt, 0, uint64, &(g->group_id)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); if (res < 0) mx_log_err("mx_mysql_statement_execute(): %m"); mx_mysql_statement_close(&stmt); if (res < 0) return -(errno=-res); return (int)num_rows; } static int update_job_status_cancelling_by_job_id_for_user(struct mx_mysql *mysql, uint64_t job_id, uint64_t user_uid) { struct mx_mysql_stmt *stmt = NULL; unsigned long long num_rows = 0; int res; assert(job_id); stmt = mx_mysql_statement_prepare(mysql, "UPDATE mxq_job AS j" " LEFT JOIN mxq_group AS g" " ON j.group_id = g.group_id" " SET" " job_status = " status_str(MXQ_JOB_STATUS_CANCELLING) " WHERE job_id = ?" " AND user_uid = ?" " AND job_status IN (" status_str(MXQ_JOB_STATUS_INQ) "," status_str(MXQ_JOB_STATUS_ASSIGNED) ")" " AND host_hostname = ''" " AND server_id = ''" " AND host_pid = 0" ); if (!stmt) { mx_log_err("mx_mysql_statement_prepare(): %m"); return -(errno=EIO); } res = mx_mysql_statement_param_bind(stmt, 0, uint64, &(job_id)); res += mx_mysql_statement_param_bind(stmt, 1, uint64, &(user_uid)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); if (res < 0) mx_log_err("mx_mysql_statement_execute(): %m"); mx_mysql_statement_close(&stmt); if (res < 0) return -(errno=-res); return (int)num_rows; } static int update_job_status_cancelled_by_job_id(struct mx_mysql *mysql, uint64_t job_id) { struct mx_mysql_stmt *stmt = NULL; unsigned long long num_rows = 0; int res; assert(job_id); stmt = mx_mysql_statement_prepare(mysql, "UPDATE mxq_job SET" " job_status = " status_str(MXQ_JOB_STATUS_CANCELLED) " WHERE job_id = ?" " AND job_status = " status_str(MXQ_JOB_STATUS_CANCELLING) " AND host_hostname = ''" " AND server_id = ''" " AND host_pid = 0" ); if (!stmt) { mx_log_err("mx_mysql_statement_prepare(): %m"); return -(errno=EIO); } res = mx_mysql_statement_param_bind(stmt, 0, uint64, &(job_id)); assert(res == 0); res = mx_mysql_statement_execute(stmt, &num_rows); if (res < 0) mx_log_err("mx_mysql_statement_execute(): %m"); mx_mysql_statement_close(&stmt); if (res < 0) return -(errno=-res); return (int)num_rows; } int main(int argc, char *argv[]) { struct mx_mysql *mysql = NULL; struct mxq_group group; uid_t ruid, euid, suid; struct passwd *passwd; int res; uint64_t arg_group_id; uint64_t arg_job_id; char arg_debug; uint64_t arg_uid; char *arg_mysql_default_group; char *arg_mysql_default_file; int i; int opt; struct mx_getopt_ctl optctl; struct mx_option opts[] = { MX_OPTION_NO_ARG("help", 'h'), MX_OPTION_NO_ARG("version", 'V'), MX_OPTION_NO_ARG("debug", 5), MX_OPTION_NO_ARG("verbose", 'v'), MX_OPTION_REQUIRED_ARG("user", 'u'), MX_OPTION_REQUIRED_ARG("group-id", 'g'), MX_OPTION_REQUIRED_ARG("job-id", 'J'), MX_OPTION_OPTIONAL_ARG("mysql-default-file", 'M'), MX_OPTION_OPTIONAL_ARG("mysql-default-group", 'S'), MX_OPTION_END }; arg_mysql_default_group = getenv("MXQ_MYSQL_DEFAULT_GROUP"); if (!arg_mysql_default_group) arg_mysql_default_group = MXQ_MYSQL_DEFAULT_GROUP; arg_mysql_default_file = getenv("MXQ_MYSQL_DEFAULT_FILE"); if (!arg_mysql_default_file) arg_mysql_default_file = MXQ_MYSQL_DEFAULT_FILE; arg_group_id = 0; arg_job_id = 0; arg_debug = 0; arg_uid = UINT64_UNSET; mx_log_level_set(MX_LOG_NOTICE); res = getresuid(&ruid, &euid, &suid); assert(res != -1); mx_getopt_init(&optctl, argc-1, &argv[1], opts); optctl.flags = MX_FLAG_STOPONNOOPT; while ((opt=mx_getopt(&optctl, &i)) != MX_GETOPT_END) { if (opt == MX_GETOPT_ERROR) { exit(EX_USAGE); } switch (opt) { case 'V': mxq_print_generic_version(); exit(EX_USAGE); case 'h': print_usage(); exit(EX_USAGE); case 5: arg_debug = 1; mx_log_level_set(MX_LOG_DEBUG); break; case 'u': passwd = getpwnam(optctl.optarg); if (passwd) { arg_uid = passwd->pw_uid; break; } mx_log_debug("user %s not found. trying numeric uid.", optctl.optarg); if (!isdigit(*optctl.optarg)) { mx_log_err("Invalid argument for --user '%s': User not found.", optctl.optarg); exit(EX_USAGE); } if (mx_strtou64(optctl.optarg, &arg_uid) < 0 || arg_uid >= UINT64_SPECIAL_MIN) { if (arg_uid >= UINT64_SPECIAL_MIN) errno = ERANGE; mx_log_err("Invalid argument for --user '%s': %m", optctl.optarg); exit(EX_USAGE); } errno = 0; passwd = getpwuid(arg_uid); if (!passwd) { if (errno) mx_log_err("Can't load user '%s': %m"); else mx_log_err("Invalid argument for --user '%s': User not found.", optctl.optarg); exit(EX_USAGE); } break; case 'v': if (!arg_debug) mx_log_level_set(MX_LOG_INFO); break; case 'g': if (mx_strtou64(optctl.optarg, &arg_group_id) < 0 || !arg_group_id) { if (!arg_group_id) errno = ERANGE; mx_log_err("Invalid argument for --group-id '%s': %m", optctl.optarg); exit(1); } break; case 'J': if (mx_strtou64(optctl.optarg, &arg_job_id) < 0 || !arg_job_id) { if (!arg_job_id) errno = ERANGE; mx_log_err("Invalid argument for --job-id '%s': %m", optctl.optarg); exit(1); } break; case 'M': arg_mysql_default_file = optctl.optarg; break; case 'S': arg_mysql_default_group = optctl.optarg; break; } } MX_GETOPT_FINISH(optctl, argc, argv); if (!arg_group_id && !arg_job_id) { print_usage(); exit(EX_USAGE); } if (arg_uid == UINT64_UNSET) arg_uid = ruid; if (arg_uid != ruid && ruid != 0) { mx_log_err("Nice try, but only root user may kill jobs of other users! Better luck next time."); exit(EX_USAGE); } if (!passwd) { errno = 0; passwd = getpwuid(arg_uid); if (!passwd && errno) { mx_log_err("Can't load user with uid '%lu': %m", arg_uid); exit(EX_IOERR); } if (!passwd) { assert(arg_uid == ruid); mx_log_err("Can't load current user with uid '%lu'.", arg_uid); exit(EX_NOUSER); } } res = mx_mysql_initialize(&mysql); assert(res == 0); mx_mysql_option_set_default_file(mysql, arg_mysql_default_file); mx_mysql_option_set_default_group(mysql, arg_mysql_default_group); res = mx_mysql_connect_forever(&mysql); assert(res == 0); mx_log_info("MySQL: Connection to database established."); if (arg_job_id) { int res1, res2; res1 = update_job_status_cancelling_by_job_id_for_user(mysql, arg_job_id, passwd->pw_uid); res2 = update_job_status_cancelled_by_job_id(mysql, arg_job_id); mx_mysql_finish(&mysql); mx_log_info("MySQL: Connection to database closed."); if (res1 == -ENOENT) res1=0; if (res2 == -ENOENT) res1=0; if (res1 < 0) mx_log_err("setting status of job %lu to CANCELLING failed: %s", arg_job_id, strerror(-res1)); if (res2 < 0) mx_log_err("setting status of job %lu to CANCELLED failed: %s", arg_job_id, strerror(-res2)); if (res2 > 0) { mx_log_notice("Job %lu cancelled!", arg_job_id); return 0; } if (res1 > 0) { mx_log_notice("Updated status of job %lu to CANCELLING.", arg_job_id); if (res2 == 0) { mx_log_warning("Updating status of job %lu to CANCELLED failed. Job vanished. Please retry.", arg_job_id); return 2; } return 1; } if (res1 == 0 && res2 == 0) { mx_log_notice("No queued job with job_id=%lu for user %s(%d) found in q.", arg_job_id, passwd->pw_name, passwd->pw_uid); mx_log_warning("Killing a single job is not implemented yet."); mx_log_warning("See https://github.molgen.mpg.de/mariux64/mxq/issues/4 for more details."); return 0; } return 1; } if (arg_group_id) { memset(&group, 0, sizeof(group)); group.group_id = arg_group_id; group.user_uid = passwd->pw_uid; group.user_name = passwd->pw_name; res = update_group_status_cancelled(mysql, &group); if (res <= 0) { mx_mysql_finish(&mysql); mx_log_info("MySQL: Connection to database closed."); if (res == 0) mx_log_warning("no active group with group_id=%lu found for user=%s(%d)", group.group_id, group.user_name, group.user_uid); else mx_log_err("cancelling group failed: %m"); return 1; } assert(res == 1); res = update_job_status_cancelled_by_group(mysql, &group); mx_mysql_finish(&mysql); mx_log_info("MySQL: Connection to database closed."); if (res == -1 && errno == ENOENT) res=0; if (res >= 0) { if (res) mx_log_notice("cancelled %d jobs in group with group_id=%lu", res, group.group_id); mx_log_notice("marked all running jobs in group with group_id=%lu to be killed by executing servers.", group.group_id); mx_log_notice("deactivated group with group_id=%lu", group.group_id); return 0; } else { mx_log_err("cancelling jobs failed: %m"); return 1; } } mx_mysql_finish(&mysql); mx_log_info("MySQL: Connection to database closed."); return 1; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
You can’t perform that action at this time.