Skip to content
Permalink
master
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
executable file 1470 lines (1171 sloc) 36.3 KB
#! /usr/local/bin/perl
use warnings;
use strict;
# $Header: /home/buczek/cluster/clusterd,v 1.101 2013/11/20 19:55:35 root Exp $
our ($RCS_REVISION)='$Revision: 1.101 $'=~/([\d.]+)/;
#use lib ('/home/buczek/cluster/Donald/blib/lib');
use Donald::Tools qw(encode sign check_sign decode);
use Donald::Select;
use Donald::FileInfo;
use Donald::Select::INET;
use POSIX;
use IO::Pipe;
#--------------------------------------
package My::Cluster::Updown;
#
# monitor nodes
#
our %H; # ( name=> [state , last_seen , expect_seq , @data ] , ... )
our $MONITOR_STARTING=1;
our %DUPLICATES;
# data currently:
# 0: float load-average
# 1: clusterd version
# 2: processor load
# 3: processor capacity
# 4: string unix version
use Storable;
sub save_state {
store \%H,'/root/clusterd.monitor.state';
}
sub restore_state {
-e '/root/clusterd.monitor.state' and %H=%{retrieve '/root/clusterd.monitor.state'};
for (values %H) {
$_->[1]=0;
}
}
sub status {
my ($f)=@_;
my (@up,@down);
for (sort keys %H) { push @up, $_ if $H{$_}->[0] eq 'UP' }
for (sort keys %H) { push @down,$_ if $H{$_}->[0] eq 'DOWN' }
$f->printf("UP (%d): %s\n\n",scalar(@up),join(' ',@up));
$f->printf("DOWN (%d): %s\n\n",scalar(@down),join(' ',@down));
my (@k,$max);
@k=sort({$H{$b}->[3] <=> $H{$a}->[3]} grep({$H{$_}->[0] eq 'UP'} (keys %H)));
$max=0;
$f->print("TOP 10: ");
for (@k) {
$f->print(sprintf "%s (%4.2f) ",$_,$H{$_}->[3]);
last if $max++>=10;
}
$f->print("\n\n");
@k=sort({($H{$b}->[5]||0) <=> ($H{$a}->[5]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H)));
$max=0;
$f->print("TOP 10 CPU load : ");
for (@k) {
$f->print(sprintf "%s (%3.1f%%) ",$_,($H{$_}->[5]||0)*100);
last if $max++>=10;
}
$f->print("\n\n");
@k=sort({($H{$b}->[6]||0) <=> ($H{$a}->[6]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H)));
$max=0;
my $total_bogomips=0;
$f->print("TOP 10 free capacity : ");
for (@k) {
my $bogo=$H{$_}->[6]||0;
$f->print(sprintf "%s (%3.1f) ",$_,$bogo) if $max++<10;
$total_bogomips+=$bogo
}
$f->printf("\ntotal available bogomips: %d\n\n",$total_bogomips);
}
sub msg_text {
my ($text)=@_;
warn($text."\n");
my $time=scalar(localtime);
main::mgmt_print_all($time.': '.$text."\n");
}
sub msg_state {
my ($state_old,$state_new,$hostname,$extra)=@_;
msg_text(sprintf "%-4s -> %-4s %-20s %s",$state_old,$state_new,$hostname,$extra);
}
sub init {
restore_state;
msg_text('node monitor: started. (recovery mode)');
Donald::Select::timeout(630,sub{$MONITOR_STARTING=0;msg_text('node monitor: recovery finished');});
Donald::Select::timeout(630,\&timeout_hosts);
}
sub timeout_hosts {
Donald::Select::timeout_requeue(60);
my $timeout=Donald::Select::time()-1230; # 2x10 minutes + 30 seconds
for (keys %H) {
my $h=$H{$_};
if ($h->[0] eq 'UP' && $h->[1]<=$timeout) {
msg_state('UP','DOWN',$_,"timeout!");
$h->[0]='DOWN';
}
}
save_state();
}
sub rx_hostannounce {
# ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev)
my ($host,$seq,@more)=@_;
my ($load_average,$version,$pload,$pcapacity,$unixrev)=@more;
$unixrev ||= '?';
unless (exists $H{$host}) {
if ($seq==0) {
msg_state('NEW','UP',$host,"discovered new node $version - $unixrev");
} else {
$MONITOR_STARTING or msg_state('NEW','UP',$host,"discovered running node $version - $unixrev");
}
$H{$host}=['UP',Donald::Select::time,$seq+1,@more];
} else {
my $h=$H{$host};
if ($h->[0] eq 'UP') {
if ($seq == $h->[2]) {
;
} else {
if ($seq==0) {
# msg_state('UP','UP',$host,"node rebootet $version - $unixrev");
} elsif ($seq>$h->[2]) {
$MONITOR_STARTING or msg_state('UP','UP',$host,$seq-$h->[2]." packet(s) lost!");
} elsif ($seq+1==$h->[2]) {
warn "DUPLICATE from $host\n";
$DUPLICATES{$host}=($DUPLICATES{$host}||0)+1;
} else {
msg_state('UP','UP',$host,"node rebooted and ".$seq." packet(s) lost!");
}
}
} else {
if ($seq==0) {
msg_state('DOWN','UP',$host,"node rebooted $version - $unixrev");
} elsif ($seq==$h->[2]) {
msg_state('DOWN','UP',$host,"sequence error. Node to slow? (seq=$seq,exp=".$h->[2].")");
} else {
msg_state('DOWN','UP',$host,$seq-$h->[2]." packet(s) lost");
}
}
@$h=('UP',Donald::Select::time,$seq+1,@more);
}
}
sub delete_host {
my ($host)=@_;
my $h=delete $H{$host} or return;
msg_text("host $host removed from monitor");
}
#-----------------------------------------------------------------------
package My::NetlogReceiver;
our $listen_socket;
our $TCP_MAX=1024;
our $DAY_LAST_MSG;
sub day {
my @f=localtime;
return sprintf "%04d%02d%02d",$f[5]+1900,$f[4]+1,$f[3];
}
sub bigben {
my $day=day();
$day le $DAY_LAST_MSG and return;
warn "NETLOG ==================================================== morning has broken ====\n";
$DAY_LAST_MSG=$day;
}
sub bigben_timer {
bigben();
Donald::Select::timeout_requeue(30);
}
sub bigben_init {
$DAY_LAST_MSG=day();
Donald::Select::timeout(30,\&bigben_timer);
}
sub receive {
my ($socket,$peernode,$bufref)=@_;
my $data;
bigben();
defined $socket->recv($data,$TCP_MAX) or return;
# length $data or warn "$peernode: disconnect\n";
length $data or return;
$$bufref.=$data;
while (1) {
last if length($$bufref)<2;
my $l=unpack('n',$$bufref);
# warn "wait for $l+2 bytes got ".length($$bufref)."\n";
last if length($$bufref)<2+$l;
my $msg=substr($$bufref,2,$l);
$$bufref=substr($$bufref,2+$l);
$|=1;
warn "NETLOG $msg\n" unless $msg=~/NETLOG/;
}
Donald::Select::reader_requeue();
}
sub connect_request {
Donald::Select::reader_requeue();
my $socket=$listen_socket->accept();
$socket->blocking(0);
my $peernode=$socket->peerhost;
my $buffer='';
Donald::Select::reader($socket,\&receive,$socket,$peernode,\$buffer);
# warn "$peernode: connect\n";
}
sub init {
$listen_socket=new IO::Socket::INET(Proto=>'tcp',LocalPort=>1028,Listen=>1,ReuseAddr=>1) or die "$!\n";
Donald::Select::reader($listen_socket,\&connect_request);
bigben_init();
}
#------------------------------------------------------------------------
package main;
use strict;
use IO::File;
use Sys::Syslog;
use IO::Socket::INET;
use Data::Dumper;
our $UDP_MAX=1472; # for broadcast on alphas
our $UDP_PORT=234;
our $BC_RATE=8; # packets per second broadcast
our (%options); # RUN OPTIONS
our $donald_s; # Donald::Select::INET udp socket
our $my_hostname;
our $my_ip; # '141.14.12.12'
$my_hostname=lc `/bin/hostname`;
chomp($my_hostname);
$my_hostname =~ s/\.molgen\.mpg\.de$//;
while (1) {
my $addr=inet_aton($my_hostname);
if(defined $addr) {
$my_ip=inet_ntoa(inet_aton($my_hostname));
}
last if defined $my_ip;
my $once;
unless ($once) {
warn "no IP (yet)= - waiting\n";
$once++;
}
sleep 30;
}
our $my_unixrev;
$my_unixrev=`uname -r`;
chomp($my_unixrev);
our $CLUSTER_PW;
our $CLUSTER_PW_FILE='/etc/clusterd.password';
our $OLD_CLUSTER_PW_FILE='/root/clusterd.password';
our $CLUSTER_PW_TIMESTAMP=0;
$ENV{'PATH'} = '/usr/local/bin:/sbin:/usr/sbin:/bin'.($ENV{PATH}?':'.$ENV{PATH}:''); # for amq , ps , tar (gnu!)
#---------------------------------------------------------- UDP
our ($udp_peer_addr,$udp_peer_port); # ('141.14.12.12',1234)
our %UDP_HANDLER=
(
'filedata' => \&udp_rx_filedata,
'amdtardata' => \&udp_rx_amdtardata,
'loadavg.2' => \&udp_rx_loadavg2,
'restart' => \&udp_rx_restart,
'flush-gidcache' => \&udp_rx_flush_gidcache,
'log' => \&udp_rx_log,
'exec' => \&udp_rx_exec,
);
sub udp_message {
my ($data,$x_udp_peer_addr,$x_udp_peer_port,$donald_s)=@_;
($udp_peer_addr,$udp_peer_port)=($x_udp_peer_addr,$x_udp_peer_port);
defined $CLUSTER_PW or return;
my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return;
$UDP_HANDLER{$handler_name}->(@args) if exists $UDP_HANDLER{$handler_name};
}
sub udp_send_message { # udp_send_message( dst, @args) # dst='141.14.31.255' 'zork' '141.14.16.1' etc. data is anything
my ($ip,@args)=@_;
defined $CLUSTER_PW or return;
$donald_s->send_data($ip,$UDP_PORT,sign($CLUSTER_PW,encode(@args)));
}
#----------------------------------------------------------
sub push_amd_tar {
my ($donald_s)=@_;
my $filename='/tmp/amd.tar';
my $pid=fork;
defined $pid or return warn "$!\n";
unless($pid) {
chdir '/etc/amd' or die "/etc/amd: $!\n";
exec 'tar','cf',$filename,'.';
die "$!\n";
}
wait;
$? and return;
my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n";
my $digest=Digest::MD5->new->addfile($fh)->digest;
warn "tar digest is ",Digest::MD5::md5_hex($digest),"\n";
$pid=fork;
defined $pid or return warn "$!\n";
unless($pid) {
exec 'gzip','-f',$filename;
die "$!\n";
}
wait;
$? and return;
$filename='/tmp/amd.tar.gz';
my $st=Donald::FileInfo->lstat($filename);
defined $st or return warn "$filename: $!\n";
$st->type eq 'F' or return warn "$filename: not a plain file\n";
$fh=new IO::File $filename,'<' or return warn "$filename: $!\n";
my $i=0;
for (my $pos=0;$pos<$st->size;$pos+=1024) {
my $data;
defined $fh->sysread($data,1024) or return warn "$filename: $!\n";
# warn "send bytes $pos to ",$pos+length($data),"\n";
udp_broadcast_message($donald_s,'amdtardata',$st,$pos,$data,$digest);
++$i % $BC_RATE or sleep 1;
}
}
sub push_file {
my ($donald_s,$filename)=@_;
$filename =~ m"^/" or return warn "$filename: please use absolute path\n";
my $st=Donald::FileInfo->lstat($filename);
defined $st or return warn "$filename: $!\n";
$st->type eq 'F' or return warn "$filename: not a plain file\n";
$st->size<=40960 or return warn "$filename: to big for broadcast (max 40960 bytes)\n";
my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n";
my $i=0;
for (my $pos=0;$pos<$st->size;$pos+=1024) {
my $data;
defined $fh->sysread($data,1024) or return warn "$filename: $!\n";
# warn "send bytes $pos to ",$pos+length($data),"\n";
udp_broadcast_message($donald_s,'filedata',$st,$pos,$data);
++$i % $BC_RATE or sleep 1;
}
}
our %CMD=(
'mkmotd'=>'/usr/sbin/mkmotd.pl',
);
sub send_exec {
my ($donald_s,$cmd)=@_;
unless (exists $CMD{$cmd}) {
die "available commands: ",join(' , ',keys %CMD),"\n";
}
udp_broadcast_message($donald_s,'exec',$cmd);
}
sub udp_rx_exec {
my ($cmd)=@_;
warn "exec $cmd\n";
exists $CMD{$cmd} or return;
my $pid;
$pid=fork;
unless (defined $pid) {
warn "$!\n";
return;
}
unless ($pid) {
$pid=fork;
defined $pid or exit 1;
$pid and exit;
open STDIN,'<','/dev/null';
open STDOUT,'>','/dev/null';
open STDERR,'>','/dev/null';
alarm(60);
chdir '/';
exec '/bin/sh','-c',$CMD{$cmd};
exit 1;
}
wait;
}
#-------------------------------------------------------------
sub normalize_seg { # [pos,len],[pos,len],...
my @s=sort {$a->[0] <=> $b->[0]} @_;
my $i=0;
while ($i<$#s) {
# is element $i joinable with next element
my $end_0=$s[$i]->[0]+$s[$i]->[1];
if ($end_0 >= $s[$i+1]->[0] ) {
my $end_1=$s[$i+1]->[0]+$s[$i+1]->[1];
$s[$i]->[1] = ($end_0>$end_1 ? $end_0 : $end_1)-$s[$i]->[0];
splice @s,$i+1,1;
} else {
$i++;
}
}
return @s;
}
my %RECEIVER; # ( filename => $receiver, .... )
# $receiver : [ st_want , last_rx , io_handle , [ [pos,len] , [pos,len] , ... ] ]
sub purge_old_receiver {
while (my ($n,$v)=each %RECEIVER) {
if ($v->[1]+10<Donald::Select::time) {
warn 'timeout receiving ',$v->[0]->name,"\n";
log_to_stat_target('timeout receiving ',$v->[0]->name);
delete $RECEIVER{$n};
}
}
Donald::Select::timeout_requeue(60);
}
#-------------------------------------------------------------
our $INSTALLED_DIGEST='';
our $rx_filedata_done;
sub udp_rx_amdtardata {
my ($st_want,$pos,$data,$digest)=@_;
ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo';
### $digest eq $INSTALLED_DIGEST and $pos==0 and warn "/etc/amd - ",Digest::MD5::md5_hex($digest)," already installed\n";
$digest eq $INSTALLED_DIGEST and return;
#### $pos==0 and warn "receiving /etc/amd - ",Digest::MD5::md5_hex($digest),"\n";
udp_rx_filedata($st_want,$pos,$data);
if ($rx_filedata_done) {
my $pid=fork;
defined $pid or return warn "$!\n";
unless($pid) {
chdir '/etc/amd' or die "/etc/amd: $!\n";
exec 'tar','xzf',$st_want->name;
die "$!\n";
}
}
wait;
$? and return;
warn "installed /etc/amd - ",Digest::MD5::md5_hex($digest),"\n";
$INSTALLED_DIGEST=$digest;
system 'amq','-f';
system '/sbin/make-automaps';
}
sub udp_rx_filedata {
# set rx_filedata_done as a side effect
my ($st_want,$pos,$data)=@_;
ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo';
my $filename=$st_want->name;
my $tmp_filename="$filename.tmp";
$rx_filedata_done=0;
my $st_is=Donald::FileInfo->lstat($st_want->name);
if ($st_is && $st_is->type eq 'F' && $st_is->size==$st_want->size && $st_is->mtime==$st_want->mtime) {
#### $pos==0 and warn " $filename seems to be current\n";
return;
}
if (length($data) == $st_want->size) {
# complete file in one broadcast
my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0);
defined $fh or return warn "$tmp_filename: $!\n";
$fh->syswrite($data);
$fh->close;
chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n";
chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n";
$st_is and unlink($filename);
rename($tmp_filename,$filename);
utime($st_want->mtime,$st_want->mtime,$filename);
warn "installed $filename\n";
$rx_filedata_done=1;
return;
}
length($data) or return; # shouldn't happen.
my $receiver=$RECEIVER{$st_want->name};
if (defined $receiver) {
if ( $receiver->[0]->size != $st_want->size or $receiver->[0]->mtime != $st_want->mtime ) {
$receiver=undef;
}
}
unless (defined $receiver) {
# create new receiver
## warn "start receiving $filename from $udp_peer_addr\n";
-e $tmp_filename and unlink($tmp_filename);
my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0);
defined $fh or return warn "$tmp_filename: $!\n";
$receiver = [$st_want,Donald::Select::time,$fh,[]];
$RECEIVER{$filename}=$receiver;
}
{
# warn "$filename: receive $pos length ",length($data),"\n";
# write data ( size cant be 0 here )
$receiver->[2]->seek($pos,0) or return warn "$tmp_filename: $!\n";
$receiver->[2]->syswrite($data) or return warn "$tmp_filename: $!\n";;
$receiver->[1]=Donald::Select::time;
my $s=$receiver->[3]=[normalize_seg(@{$receiver->[3]},[$pos,length($data)])];
#warn "$filename: receive $pos length ",length($data)," segments now: ",join(",",map( {'['.$_->[0].','.$_->[1].']'} @{$receiver->[3]})),"\n";
# all there ?
if (@$s == 1 && $s->[0]->[0]==0 && $s->[0]->[1]==$st_want->size) {
$receiver->[2]->close;
chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n";
chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n";
$st_is and unlink($filename);
rename($tmp_filename,$filename);
utime($st_want->mtime,$st_want->mtime,$filename);
warn "installed $filename\n";
delete $RECEIVER{$filename};
$rx_filedata_done=1;
}
}
}
#----------------------------------------------------------- sample running proc every 10 seconds
our @proc_running=(0)x12; # 12 samples every 5 seconds -> minute average
our @proc_running_10=(0)x10; # 10 samples every minute -> 10 minute average
our $SAMPLE_TICK=0;
sub xget_cur_running_proc {
open L,'<','/proc/stat' or return 0;
while (<L>) {
# warn $_;
return $1 if /^procs_running\s*(\d+)/;
}
return 0;
}
sub get_cur_running_proc {
our $CPUS;
my $r=xget_cur_running_proc();
if ($CPUS && $r>$CPUS+1) {
pload_debug("more than $CPUS+1 proc");
}
return $r;
}
sub running_proc {
my $ret=0;
$ret+=$_ for @proc_running;
return $ret/@proc_running;
}
sub running_proc_10 {
my $ret=0;
$ret+=$_ for @proc_running_10;
return $ret/@proc_running_10;
}
sub sample_rproc { # every 5 seconds
@proc_running=(@proc_running[1..@proc_running-1],get_cur_running_proc()-1);
if ($SAMPLE_TICK<12) {
$SAMPLE_TICK++;
} else {
@proc_running_10=(@proc_running_10[1..@proc_running_10-1],running_proc());
$SAMPLE_TICK=0;
}
Donald::Select::timeout_requeue(5);
}
#----------------------------------------------------------- stat
#-----------------------------------------------------------
our ($CPUS,$BOGOMIPS);
sub init_cpuinfo {
Donald::Tools::is_alpha and return;
open L,'<','/proc/cpuinfo' or return;
while (<L>) {
if (/^bogomips\s*:\s*([\d.]+)/) {$CPUS++;$BOGOMIPS+=$1;}
}
}
sub loadavg { # AXP : (system load average) , LINUX: (system load average, pload, freebogo)
if (Donald::Tools::is_alpha) {
my $data=pack 'l!3lx4l!3';
my $i=syscall(85,3,0,$data,1,length($data)); # table(id=TBL_LOADAVG,index=0,addr=data,nel=1,lel=56)
my ($l0,$l1,$l2,$scale,$mach0,$mach1,$mach2)=unpack 'l!3lx4l!3',$data;
$scale or return undef;
return $l0/$scale;
} else {
$CPUS or init_cpuinfo();
my $running_proc=running_proc();
open L,'<','/proc/loadavg' or return undef;
my $data;
sysread(L,$data,1024);
close L;
$data =~ /^(\d+\.?\d*)/ or return undef; # 5 min loadavg
if ($CPUS) {
return ($1+0,($running_proc)/$CPUS,($running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS));
} else {
return $1+0;
}
}
}
our $STAT_TARGET='lol';
our $STAT_SEQ=0;
sub send_stat {
Donald::Select::timeout_requeue(600);
my ($load_avg,$pload,$pcapacity)=loadavg();
defined $load_avg or return;
udp_send_message($STAT_TARGET,'loadavg.2',$my_hostname,$STAT_SEQ++,$load_avg,version_info(),$pload,$pcapacity,$my_unixrev);
}
sub udp_rx_loadavg2 {
my ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev)=@_;
$my_hostname eq $STAT_TARGET and My::Cluster::Updown::rx_hostannounce($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev)
}
sub udp_rx_log {
my ($msg)=@_;
My::Cluster::Updown::msg_text($msg);
}
sub log_to_stat_target {
my ($msg)=join '',@_;
udp_send_message($STAT_TARGET,'log',"$my_hostname: $msg");
}
# ----------------------------------------------------------
sub udp_rx_restart {
# double-fork, because kill_previous_server() won't kill its parent
my $pid=fork;
if (defined $pid && $pid==0) {
my $pid2=fork;
if (defined $pid2 && $pid==0) {
exec '/sbin/clusterd','--kill','--daemon';
die "exec failed: $!\n";
}
}
}
sub udp_rx_flush_gidcache {
if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') {
print $out time();
} else {
warn "proc/net/rpc/auth.unix.gid/flush: $!\n";
}
}
#----------- tcp mgmt console -----------------------------
our $MGMT_PORT=234;
our $mgmt_listen_socket;
our %mgmt_sockets;
sub mgmt_init {
$mgmt_listen_socket=new IO::Socket::INET(LocalPort=>$MGMT_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1);
defined $mgmt_listen_socket or die "$!\n";
Donald::Select::reader($mgmt_listen_socket,\&mgmt_connect_request);
}
sub mgmt_connect_request {
Donald::Select::reader_requeue();
# listen socket ready
my $socket=$mgmt_listen_socket->accept();
$socket->blocking(0);
my $peernode=$socket->peerhost;
### warn "accepted mgmt connection from $peernode\n";
Donald::Select::reader($socket,\&mgmt_receive,$socket);
$mgmt_sockets{$socket}=$socket;
$socket->print("clusterd ".version_info()." stupid console\n");
$socket->print("For historical messages, grep \"clusterd\" from /var/log/messages on $STAT_TARGET (or \"tail -f /var/log/messages |grep cluster\")\n");
}
sub mgmt_receive {
my ($s)=@_;
my $data;
my $len=$s->sysread($data,1024);
if (!defined $len or $len==0 or $len==1 && $data eq "\cD") {
delete $mgmt_sockets{$s};
$s->close;
###warn "lost connection to mgmt port\n";
return;
}
Donald::Select::reader_requeue();
$data=~s/\r?\n$//;
length $data or return;
if ($data eq 'l') {
$my_hostname eq $STAT_TARGET and My::Cluster::Updown::status($s);
$s->print("AREA: ",area_config_as_string(),"\n");
$s->print("STAT TARGET: ",$STAT_TARGET,"\n");
$s->print(' (',scalar(localtime),')',"\n");
}
elsif ($data eq 'r') {
for my $host (sort keys %H) {
if ($H{$host}->[0] eq 'UP') {
$s->printf("%-40s : %s\n",$host,$H{$host}->[7]);
}
}
} elsif ($data eq 'v') {
$Data::Dumper::Terse=1;
$Data::Dumper::Indent=0;
for (sort keys %H) {
$s->printf("%15s : %s\n",$_,Dumper($H{$_}));
}
} elsif ($data eq 'd') {
my $running_proc=running_proc();
my $running_proc_10=running_proc_10();
$s->printf("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc);
$s->printf("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10);
$CPUS or init_cpuinfo;
$CPUS and $s->printf("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS);
$CPUS and $s->printf("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS);
} elsif ($data =~ /^delete (\S+)$/) {
My::Cluster::Updown::delete_host($1);
} elsif ($data eq 'dup show') {
for my $h (sort {$DUPLICATES{$b} <=> $DUPLICATES{$a} } keys %DUPLICATES) {
$s->printf("%-10s : %5d\n",$h,$DUPLICATES{$h});
}
} elsif ($data eq 'dup clear') {
%DUPLICATES=();
} else {
$s->print(<<"_EOF_");
unknown command: $data
l : list status
v : dump status array
d : debug (cpu speed calc)
r : dump unix revisions
delete HOST : forget about HOST
dup show : show duplicates stat
dup clear : clear duplicates stat
to exit use ^D
_EOF_
}
}
sub mgmt_print_all {
my ($msg)=@_;
$options{'foreground'} and not $options{'syslog'} and print $msg;
for my $s (values(%mgmt_sockets)) {
$s->print($msg);
}
}
#-----------------------------------------------------------
sub clp_rx_LSOF {
my ($socket,$pattern)=@_;
my $pid=fork;
unless (defined $pid) {
warn"$!\n";
return;
}
unless ($pid) {
my $pid=fork;
defined $pid or die "$!\n";
unless ($pid) {
$socket->blocking(1);
open P,'lsof|' or die "$!\n";
while (<P>) {
next if defined $pattern && index($_,$pattern)<0;
$socket->send(pack('n',length($_)).$_,0);
}
close P;
close $socket;
exit;
}
exit;
}
close $socket;
wait;
return 1;
}
sub run_cmd {
my ($socket,@cmd)=@_;
my $pid=fork;
unless (defined $pid) {
warn"$!\n";
return;
}
unless ($pid) {
my $opipe=new IO::Pipe;
my $epipe=new IO::Pipe;
my $cpid=fork;
defined $cpid or die "$!\n";
unless ($cpid) {
warn "exec ".join(' ',@cmd)."\n";
$opipe->writer();
$epipe->writer();
open STDIN,'<','/dev/null';
open STDOUT,'>&',$opipe;
open STDERR,'>&',$epipe;
exec @cmd;
die "$!\n";
}
# $::SIG{'CHLD'}=sub {
# my $pid=wait;
# my $buffer="X$?";
# $socket->send(pack('n',length($buffer)).$buffer,0);
# exit;
# };
$opipe->reader();
$epipe->reader();
my $ofn=$opipe->fileno;
my $efn=$epipe->fileno;
my ($rvec_in,$wvec_in,$evec_in)=('','','');
vec($rvec_in,$ofn,1)=1;
vec($rvec_in,$efn,1)=1;
my $channel=2;
my $buffer;
$socket->blocking(1);
while (1) {
my ($rvec,$wvec,$evec)=($rvec_in,$wvec_in,$evec_in);
my $ready=select($rvec,$wvec,$evec,60);
$ready or die "timeout\n";
if (vec($rvec,$ofn,1)) {
my $len=$opipe->sysread($buffer,1024-1-2);
defined $len or die "$!\n";
if ($len) {
$socket->send(pack('n',$len+1)."O$buffer",0);
} else {
vec($rvec_in,$ofn,1)=0;
$opipe->close;
$channel--;
}
}
if (vec($rvec,$efn,1)) {
my $len=$epipe->sysread($buffer,1024-1-2);
defined $len or die "$!\n";
if ($len) {
$socket->send(pack('n',$len+1)."E$buffer",0);
} else {
vec($rvec_in,$efn,1)=0;
$epipe->close;
$channel--;
}
}
if ($channel==0) {
my $pid=wait;
my $buffer="X$?";
$socket->send(pack('n',length($buffer)).$buffer,0);
exit;
}
}
}
}
#----------- CLP cluster protocol -----------------------------
our $CLP_PORT=235;
our $clp_listen_socket;
our %clp_sockets;
our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF);
sub clp_init {
$clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1);
defined $clp_listen_socket or die "$!\n";
Donald::Select::reader($clp_listen_socket,\&clp_connect_request);
}
sub clp_connect_request {
Donald::Select::reader_requeue();
# listen socket ready
my $socket=$clp_listen_socket->accept();
$socket->blocking(0);
my $peernode=$socket->peerhost;
my $buffer='';
Donald::Select::reader($socket,\&clp_receive,$socket,\$buffer);
$clp_sockets{$socket}=$socket;
}
sub clp_receive {
my ($s,$bufref)=@_;
my $data;
defined $s->recv($data,$TCP_MAX) or return;
if (!length($data) ) {
delete $mgmt_sockets{$s};
$s->close;
return;
}
$$bufref.=$data;
while (1) {
last if length($$bufref)<2;
my $l=unpack('n',$$bufref);
last if length($$bufref)<2+$l;
my $msg=substr($$bufref,2,$l);
$$bufref=substr($$bufref,2+$l);
clp_message($s,$msg) and return;
}
Donald::Select::reader_requeue();
}
sub clp_message {
my ($socket,$data)=@_;
defined $CLUSTER_PW or return;
my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return;
$CLP_HANDLER{$handler_name}->($socket,@args) if exists $CLP_HANDLER{$handler_name};
}
sub clp_send_message { # clp_send_message($socket, @args)
my ($s,@args)=@_;
defined $CLUSTER_PW or return;
my $data=sign($CLUSTER_PW,encode(@args));
unless ($s->peername) {
return 0;
}
$s->send(pack('n',length($data)).$data);
}
sub clp_rx_CMD {
my ($socket,@args)=@_;
run_cmd($socket,@args);
close $socket;
return 1;
}
#----------------------------------------------------------
#our $CLUSTER_PW;
#our $CLUSTER_PW_FILE='/etc/clusterd.password';
#our $OLD_CLUSTER_PW_FILE='/root/clusterd.password';
#our $CLUSTER_PW_TIMESTAMP=0;
sub sync_cluster_pw {
my $st=Donald::FileInfo->lstat($CLUSTER_PW_FILE);
# upgrade : move cluster password file from /root to /etc
if (!$st && -e $OLD_CLUSTER_PW_FILE) {
warn "upgrading cluster password file location $OLD_CLUSTER_PW_FILE -> $CLUSTER_PW_FILE\n";
my $in=new IO::File $OLD_CLUSTER_PW_FILE,'<';
unless (defined $in) {warn "$OLD_CLUSTER_PW_FILE: $!\n";return undef;}
my $out=new IO::File $CLUSTER_PW_FILE,O_WRONLY|O_CREAT,0600;
unless (defined $out) {warn "$CLUSTER_PW_FILE: $!\n";return undef;}
my $data;
$in->read($data,1024);
$out->write($data);
$in->close;
$out->close;
$st=Donald::FileInfo->lstat($CLUSTER_PW_FILE);
defined $st or die "$CLUSTER_PW_FILE: $!\n";
unlink $OLD_CLUSTER_PW_FILE;
}
if ($st) {
if (!defined $CLUSTER_PW or $CLUSTER_PW_TIMESTAMP != $st->mtime) {
my $fh=new IO::File $CLUSTER_PW_FILE,'<';
if (defined ($fh)) {
defined $CLUSTER_PW and warn "update cluster password\n";
$fh->read($CLUSTER_PW,1024);
$CLUSTER_PW_TIMESTAMP=$st->mtime;
} else {
defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n";
$CLUSTER_PW=undef;
}
}
} else {
defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n";
$CLUSTER_PW=undef;
}
Donald::Select::timeout(60,\&sync_cluster_pw);
return defined $CLUSTER_PW;
}
#------------------------------------------------------------
# area routing
our %AREA_ROUTER=
(
lol => '141.14.31.255',
# orkrist=> '10.14.0.255',
);
our $area_socket;
sub init_area {
exists $AREA_ROUTER{$my_hostname} or return;
warn "I am area router for $AREA_ROUTER{$my_hostname}\n";
$area_socket=new IO::Socket::INET (Proto=>'udp',LocalPort=>$UDP_PORT+1) or die "$!\n";
Donald::Select::reader($area_socket,\&area_message,$area_socket);
}
sub area_message {
my ($area_socket)=@_;
my $data;
my $peer = $area_socket->recv($data,$UDP_MAX);
my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer);
my $udp_peer_addr=inet_ntoa($peer_iaddr);
$donald_s->send_data($AREA_ROUTER{$my_hostname},$UDP_PORT,$data); # broadcast to our network
Donald::Select::reader_requeue();
}
sub udp_broadcast_message {
my ($donald_s,@args)=@_;
defined $CLUSTER_PW or return;
my $data=sign($CLUSTER_PW,encode(@args));
for my $ip (keys %AREA_ROUTER) {
$donald_s->send_data($ip,$UDP_PORT+1,$data);
}
}
sub area_config_as_string {
return join(',',map({"$_ => $AREA_ROUTER{$_}"} keys %AREA_ROUTER))
}
#------------------------------------------------------------
our $PROG_FILE; # saved $0 - may be relative path
our $PROG_MTIME;
sub check_progfile_status {
defined $PROG_FILE or $PROG_FILE=$0;
my @f=lstat $PROG_FILE or return;
if (defined $PROG_MTIME) {
if ($f[9] != $PROG_MTIME) {
warn "progfile $PROG_FILE has changed - upgrade restart from version ".version_info()."\n";
exec $PROG_FILE,'--daemon',($options{'foreground'}?'--foreground':()),($options{'syslog'}?'--syslog':());
}
}
else {
$PROG_MTIME=$f[9];
}
Donald::Select::timeout(60,\&check_progfile_status);
}
sub version_info { # 'V1.31 - 20090617-155314'
my $t;
if (defined $PROG_MTIME) {
my @f=localtime($PROG_MTIME);
$t=sprintf '%4d%02d%02d-%02d%02d%02d',$f[5]+1900,$f[4]+1,$f[3],$f[2],$f[1],$f[0];
} else {
$t='?';
}
return "V$RCS_REVISION - ".$t;
}
#------------------------------------------------------------
sub pload_debug {
my ($why)=@_;
open T,'>','/var/log/pload_debug.txt' or return;
print T "$why at ".scalar(localtime),"\n";
my $running_proc=running_proc();
my $running_proc_10=running_proc_10();
printf T ("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc);
printf T ("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10);
$CPUS or init_cpuinfo;
$CPUS and printf T ("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS);
$CPUS and printf T ("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS);
system "uptime>>/var/log/pload_debug.txt";
system "ps -AlfT>>/var/log/pload_debug.txt";
system "cat /proc/stat>>/var/log/pload_debug.txt";
system "cat /proc/swaps>>/var/log/pload_debug.txt";
close T;
}
sub check_overload() {
Donald::Select::timeout_requeue(600);
if ($CPUS) {
my $running_proc_10=running_proc_10();
my $pload_10=running_proc_10()/$CPUS;
$pload_10>1.5 and warn (sprintf("pload>150%% (%3.1f%%)\n",$pload_10*100));
}
}
#####################################################
my $slave=0;
my $exit_value=0;
sub expand_netgroup_hosts {
my (@netgroups)=@_;
my %DID=();
my @out=();
while (@netgroups) {
my $ng=pop @netgroups;
for my $entry (split ' ',`ypmatch $ng netgroup`) {
if ($entry=~/^\((\S+),.*,.*\)$/) {
push @out,$1;
} elsif ($entry=~/^[a-z0-9_]+$/i) {
push @netgroups,$entry;
} else {
warn "ignored entry $entry\n";
}
}
}
return @out;
}
sub lsof {
my ($pattern)=@_;
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
#for my $host ('theinternet') {
for my $host (sort(expand_netgroup_hosts('AMD_SHORT'))) {
next if $host eq 'tux';
my $s=new IO::Socket::INET (PeerAddr=>$host,PeerPort=>$CLP_PORT);
unless (defined $s) {
warn "$host: $!\n";
next;
}
clp_send_message($s,'LSOF',$pattern);
my $pbuffer='';
my $olbuffer='';
Donald::Select::reader($s,\&lsof_rx,$host,$s,\$pbuffer,\$olbuffer);
$slave++;
}
Donald::Select::run() if $slave;;
}
sub lsof_rx {
my ($host,$s,$pbufref,$olbufref)=@_;
my $data;
defined $s->recv($data,$TCP_MAX*2) or return;
#print "$host : received ",length($data),"\n";
if (!length($data)) {
close $s;
--$slave;
exit $exit_value unless $slave;
return;
}
$$pbufref.=$data;
while (1) {
last if length($$pbufref)<2;
my $l=unpack('n',$$pbufref);
#print "$host : unpacked length $l\n";
last if length($$pbufref)<2+$l;
my $msg=substr($$pbufref,2,$l);
$$pbufref=substr($$pbufref,2+$l);
#print "$host : remaining pbuf length ",length($$pbufref),"\n";
#print "$host : message length ",length($msg),"\n";
lsof_msg($host,$s,$msg,$olbufref);
}
Donald::Select::reader_requeue();
}
sub lsof_msg {
my ($host,$s,$data,$olbufref)=@_;
my $msg=$$olbufref.$data;
for ($msg=~/([^\n]*\n)/gs) {
print "$host $_";
}
($$olbufref)=$msg=~/([^\n]*)\z/;
}
sub cmd_rx {
my ($host,$s,$pbufref,$olbufref,$elbufref)=@_;
my $data;
defined $s->recv($data,$TCP_MAX*2) or return;
if (!length($data)) {
close $s;
--$slave;
exit $exit_value unless $slave;
return;
}
#print "$host received ".length($data),"\n";
$$pbufref.=$data;
while (1) {
last if length($$pbufref)<2;
my $l=unpack('n',$$pbufref);
#print "$host : unpacked length $l\n";
#die "$host: to long" if $l>1024;
last if length($$pbufref)<2+$l;
my $msg=substr($$pbufref,2,$l);
$$pbufref=substr($$pbufref,2+$l);
#print "$host : remaining pbuf length ",length($$pbufref),"\n";
#print "$host : message length ",length($msg),"\n";
cmd_msg($host,$s,$msg,$olbufref,$elbufref);
}
Donald::Select::reader_requeue();
}
sub cmd_msg {
my ($host,$s,$data,$olbufref,$elbufref)=@_;
my $channel=substr($data,0,1);
if ($channel eq 'X') {
my $rdata=substr($data,1);
$rdata!=0 and $exit_value=1;
return;
} elsif ($channel eq 'O') {
my $msg=$$olbufref.substr($data,1);
for ($msg=~/([^\n]*\n)/gs) {
print "$host $_";
}
($$olbufref)=$msg=~/([^\n]*)\z/;
} elsif ($channel eq 'E') {
my $msg=$$elbufref.substr($data,1);
for ($msg=~/([^\n]*\n)/gs) {
print STDERR "$host $_";
}
($$elbufref)=$msg=~/([^\n]*)\z/;
}
}
#------------------------------------------------------------
use constant USAGE => <<'__EOF__';
usage: $0 [options]
--push file # broadcast this file
--push-amd-tar # broadcast /etc/amd
--send-restart # broadcast a restart request to all nodes
--exec mkmotd # execute /usr/sbin/mkmotd.pl on all nodes
--flush-gidcache # flush rpc auth.unix.gid cache on all nodes
--lsof=pattern
--kill # try to kill a running server
--daemon # start a daemon
--kill # try to kill previous server first
--foreground # stay in foreground, log to stderr
--syslog # log to syslog instead of stderr
__EOF__
use Getopt::Long;
GetOptions
(
'kill' => \$options{'kill'},
'daemon' => \$options{'daemon'},
'push=s' => \$options{'push'},
'exec=s' => \$options{'exec'},
'foreground' => \$options{'foreground'},
'syslog' => \$options{'syslog'},
'push-amd-tar' => \$options{'push_amd_tar'},
'send-restart' => \$options{'send-restart'},
'flush-gidcache' => \$options{'flush-gidcache'},
'lsof=s' => \$options{'lsof'},
) or die USAGE;
if (defined $options{'push'}) {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n";
push_file($donald_s,$options{'push'});
} elsif (defined $options{'exec'}) {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n";
send_exec($donald_s,$options{'exec'});
} elsif (defined $options{'push_amd_tar'}) {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n";
push_amd_tar($donald_s);
} elsif (defined $options{'send-restart'}) {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n";
udp_broadcast_message($donald_s,'restart');
} elsif (defined $options{'flush-gidcache'}) {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n";
udp_broadcast_message($donald_s,'flush-gidcache');
} elsif (defined $options{'daemon'}) {
$options{'kill'} and Donald::Tools::kill_previous_server('clusterd') and sleep 2;
$SIG{PIPE}='IGNORE';
$donald_s=new Donald::Select::INET(Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT) or die "$!\n";
$donald_s->receive_data(\&udp_message,$donald_s);
unless ($options{'foreground'}) {
my $pid=fork;
defined $pid or die "$!\n";
$pid and exit 0;
}
if ($options{'syslog'} or not $options{'foreground'}) {
openlog('clusterd','pid','daemon');
$SIG{__WARN__} = sub { syslog('warning',@_); };
$SIG{__DIE__} = sub { syslog('crit',@_);syslog('crit','exiting');exit 1;};
open (STDOUT,'>','/dev/null');
open (STDERR,'>','/dev/null');
open (STDIN,'<','/dev/null');
}
check_progfile_status();
warn "server started - ".version_info()."\n";
init_area();
mgmt_init();
clp_init();
sync_cluster_pw() or warn "$CLUSTER_PW_FILE: $!\n";
Donald::Select::timeout(60,\&purge_old_receiver);
Donald::Select::timeout(rand(60),\&send_stat);
Donald::Select::timeout(0,\&sample_rproc) unless Donald::Tools::is_alpha;
$my_hostname eq $STAT_TARGET and My::Cluster::Updown::init();
$my_hostname eq 'lol' and My::NetlogReceiver::init();
Donald::Select::timeout(600,\&check_overload);
Donald::Select::run();
} elsif ($options{'lsof'}) {
lsof($options{'lsof'});
} elsif ($options{'kill'}) {
Donald::Tools::kill_previous_server('clusterd');
} else {
die USAGE;
}