Skip to content
Permalink
a98568a157
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
executable file 1502 lines (1267 sloc) 39.1 KB
#! /usr/bin/perl
use warnings;
use strict;
# https://github.molgen.mpg.de/mariux64/clusterd
our $REVISION='1.107';
#use lib ('/home/buczek/cluster/Donald/blib/lib');
use Donald::Tools qw(encode sign check_sign decode);
use Donald::Select;
use Donald::FileInfo;
use Donald::Select::INET;
use POSIX;
use IO::Pipe;
#--------------------------------------
package My::Cluster::Updown;
#
# monitor nodes
#
our %H; # ( name=> [state , last_seen , expect_seq , @data ] , ... )
our $MONITOR_STARTING=1;
our %DUPLICATES;
# data currently:
# 0: float load-average
# 1: clusterd version
# 2: processor load
# 3: processor capacity
# 4: string unix version
use Storable;
sub save_state {
store \%H,'/root/clusterd.monitor.state';
}
sub restore_state {
-e '/root/clusterd.monitor.state' and %H=%{retrieve '/root/clusterd.monitor.state'};
for (values %H) {
$_->[1]=0;
}
}
sub status {
my ($f)=@_;
my (@up,@down);
for (sort keys %H) { push @up, $_ if $H{$_}->[0] eq 'UP' }
for (sort keys %H) { push @down,$_ if $H{$_}->[0] eq 'DOWN' }
$f->printf("UP (%d): %s\n\n",scalar(@up),join(' ',@up));
$f->printf("DOWN (%d): %s\n\n",scalar(@down),join(' ',@down));
my (@k,$max);
@k=sort({$H{$b}->[3] <=> $H{$a}->[3]} grep({$H{$_}->[0] eq 'UP'} (keys %H)));
$max=0;
$f->print("TOP 10: ");
for (@k) {
$f->print(sprintf "%s (%4.2f) ",$_,$H{$_}->[3]);
last if $max++>=10;
}
$f->print("\n\n");
@k=sort({($H{$b}->[5]||0) <=> ($H{$a}->[5]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H)));
$max=0;
$f->print("TOP 10 CPU load : ");
for (@k) {
$f->print(sprintf "%s (%3.1f%%) ",$_,($H{$_}->[5]||0)*100);
last if $max++>=10;
}
$f->print("\n\n");
@k=sort({($H{$b}->[6]||0) <=> ($H{$a}->[6]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H)));
$max=0;
my $total_bogomips=0;
$f->print("TOP 10 free capacity : ");
for (@k) {
my $bogo=$H{$_}->[6]||0;
$f->print(sprintf "%s (%3.1f) ",$_,$bogo) if $max++<10;
$total_bogomips+=$bogo
}
$f->printf("\ntotal available bogomips: %d\n\n",$total_bogomips);
}
sub msg_text {
my ($text)=@_;
warn($text."\n");
my $time=scalar(localtime);
main::mgmt_print_all($time.': '.$text."\n");
}
sub msg_state {
my ($state_old,$state_new,$hostname,$extra)=@_;
msg_text(sprintf "%-4s -> %-4s %-20s %s",$state_old,$state_new,$hostname,$extra);
}
sub init {
restore_state;
msg_text('node monitor: started. (recovery mode)');
Donald::Select::timeout(630,sub{$MONITOR_STARTING=0;msg_text('node monitor: recovery finished');});
Donald::Select::timeout(630,\&timeout_hosts);
}
sub timeout_hosts {
Donald::Select::timeout_requeue(60);
my $timeout=Donald::Select::time()-1230; # 2x10 minutes + 30 seconds
for (keys %H) {
my $h=$H{$_};
if ($h->[0] eq 'UP' && $h->[1]<=$timeout) {
msg_state('UP','DOWN',$_,"timeout!");
$h->[0]='DOWN';
}
}
save_state();
}
sub rx_hostannounce {
# ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev)
my ($host,$seq,@more)=@_;
my ($load_average,$version,$pload,$pcapacity,$unixrev)=@more;
$unixrev ||= '?';
unless (exists $H{$host}) {
if ($seq==0) {
msg_state('NEW','UP',$host,"discovered new node $version - $unixrev");
} else {
$MONITOR_STARTING or msg_state('NEW','UP',$host,"discovered running node $version - $unixrev");
}
$H{$host}=['UP',Donald::Select::time,$seq+1,@more];
} else {
my $h=$H{$host};
if ($h->[0] eq 'UP') {
if ($seq == $h->[2]) {
;
} else {
if ($seq==0) {
# msg_state('UP','UP',$host,"node rebootet $version - $unixrev");
} elsif ($seq>$h->[2]) {
$MONITOR_STARTING or msg_state('UP','UP',$host,$seq-$h->[2]." packet(s) lost!");
} elsif ($seq+1==$h->[2]) {
warn "DUPLICATE from $host\n";
$DUPLICATES{$host}=($DUPLICATES{$host}||0)+1;
} else {
msg_state('UP','UP',$host,"node rebooted and ".$seq." packet(s) lost!");
}
}
} else {
if ($seq==0) {
msg_state('DOWN','UP',$host,"node rebooted $version - $unixrev");
} elsif ($seq==$h->[2]) {
msg_state('DOWN','UP',$host,"sequence error. Node to slow? (seq=$seq,exp=".$h->[2].")");
} else {
msg_state('DOWN','UP',$host,$seq-$h->[2]." packet(s) lost");
}
}
@$h=('UP',Donald::Select::time,$seq+1,@more);
}
}
sub delete_host {
my ($host)=@_;
my $h=delete $H{$host} or return;
msg_text("host $host removed from monitor");
}
#-----------------------------------------------------------------------
package My::NetlogReceiver;
our $listen_socket;
our $TCP_MAX=1024;
our $DAY_LAST_MSG;
sub day {
my @f=localtime;
return sprintf "%04d%02d%02d",$f[5]+1900,$f[4]+1,$f[3];
}
sub bigben {
my $day=day();
$day le $DAY_LAST_MSG and return;
warn "NETLOG ==================================================== morning has broken ====\n";
$DAY_LAST_MSG=$day;
}
sub bigben_timer {
bigben();
Donald::Select::timeout_requeue(30);
}
sub bigben_init {
$DAY_LAST_MSG=day();
Donald::Select::timeout(30,\&bigben_timer);
}
sub receive {
my ($socket,$peernode,$bufref)=@_;
my $data;
bigben();
defined $socket->recv($data,$TCP_MAX) or return;
# length $data or warn "$peernode: disconnect\n";
length $data or return;
$$bufref.=$data;
while (1) {
last if length($$bufref)<2;
my $l=unpack('n',$$bufref);
# warn "wait for $l+2 bytes got ".length($$bufref)."\n";
last if length($$bufref)<2+$l;
my $msg=substr($$bufref,2,$l);
$$bufref=substr($$bufref,2+$l);
$|=1;
warn "NETLOG $msg\n" unless $msg=~/NETLOG/;
}
Donald::Select::reader_requeue();
}
sub connect_request {
Donald::Select::reader_requeue();
my $socket=$listen_socket->accept();
$socket->blocking(0);
my $peernode=$socket->peerhost;
my $buffer='';
Donald::Select::reader($socket,\&receive,$socket,$peernode,\$buffer);
# warn "$peernode: connect\n";
}
sub init {
$listen_socket=new IO::Socket::INET(Proto=>'tcp',LocalPort=>1028,Listen=>1,ReuseAddr=>1) or die "$!\n";
Donald::Select::reader($listen_socket,\&connect_request);
bigben_init();
}
#------------------------------------------------------------------------
package main;
use strict;
use IO::File;
use Sys::Syslog;
use IO::Socket::INET;
use Data::Dumper;
our $UDP_MAX=1472; # for broadcast on alphas
our $UDP_PORT=234;
our $BC_RATE=8; # packets per second broadcast
our (%options); # RUN OPTIONS
our $donald_s; # Donald::Select::INET udp socket
our $my_hostname;
our $my_ip; # '141.14.12.12'
$my_hostname=lc `/bin/hostname`;
chomp($my_hostname);
$my_hostname =~ s/\.molgen\.mpg\.de$//;
while (1) {
my $addr=inet_aton($my_hostname);
if(defined $addr) {
$my_ip=inet_ntoa(inet_aton($my_hostname));
}
last if defined $my_ip;
my $once;
unless ($once) {
warn "no IP (yet)= - waiting\n";
$once++;
}
sleep 30;
}
our $my_unixrev;
$my_unixrev=`uname -r`;
chomp($my_unixrev);
our $CLUSTER_PW;
our $CLUSTER_PW_FILE='/etc/clusterd.password';
our $OLD_CLUSTER_PW_FILE='/root/clusterd.password';
our $CLUSTER_PW_TIMESTAMP=0;
$ENV{'PATH'} = '/usr/local/bin:/sbin:/usr/sbin:/bin'.($ENV{PATH}?':'.$ENV{PATH}:''); # for amq , ps , tar (gnu!)
#---------------------------------------------------------- UDP
our ($udp_peer_addr,$udp_peer_port); # ('141.14.12.12',1234)
our %UDP_HANDLER=
(
'filedata' => \&udp_rx_filedata,
'filedata.2' => \&udp_rx_filedata,
'amdtardata' => \&udp_rx_amdtardata,
'loadavg.2' => \&udp_rx_loadavg2,
'restart' => \&udp_rx_restart,
'flush-gidcache' => \&udp_rx_flush_gidcache,
'log' => \&udp_rx_log,
'exec' => \&udp_rx_exec,
);
sub udp_message {
my ($data,$x_udp_peer_addr,$x_udp_peer_port,$donald_s)=@_;
($udp_peer_addr,$udp_peer_port)=($x_udp_peer_addr,$x_udp_peer_port);
defined $CLUSTER_PW or return;
my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return;
$UDP_HANDLER{$handler_name}->(@args) if exists $UDP_HANDLER{$handler_name};
}
sub udp_send_message { # udp_send_message( dst, @args) # dst='141.14.31.255' 'zork' '141.14.16.1' etc. data is anything
my ($ip,@args)=@_;
defined $CLUSTER_PW or return;
$donald_s->send_data($ip,$UDP_PORT,sign($CLUSTER_PW,encode(@args)));
}
#----------------------------------------------------------
sub push_amd_tar {
my ($donald_s)=@_;
my $filename='/tmp/amd.tar';
my $pid=fork;
defined $pid or return warn "$!\n";
unless($pid) {
chdir '/etc/amd' or die "/etc/amd: $!\n";
exec 'tar','cf',$filename,'.';
die "$!\n";
}
wait;
$? and return;
my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n";
my $digest=Digest::MD5->new->addfile($fh)->digest;
warn "tar digest is ",Digest::MD5::md5_hex($digest),"\n";
$pid=fork;
defined $pid or return warn "$!\n";
unless($pid) {
exec 'gzip','-f',$filename;
die "$!\n";
}
wait;
$? and return;
$filename='/tmp/amd.tar.gz';
my $st=Donald::FileInfo->lstat($filename);
defined $st or return warn "$filename: $!\n";
$st->type eq 'F' or return warn "$filename: not a plain file\n";
$fh=new IO::File $filename,'<' or return warn "$filename: $!\n";
my $i=0;
for (my $pos=0;$pos<$st->size;$pos+=1024) {
my $data;
defined $fh->sysread($data,1024) or return warn "$filename: $!\n";
# warn "send bytes $pos to ",$pos+length($data),"\n";
++$i % $BC_RATE or sleep 1;
udp_broadcast_message($donald_s,'amdtardata',$st,$pos,$data,$digest);
}
}
sub push_file {
my ($donald_s,$filename)=@_;
$filename =~ m"^/" or return warn "$filename: please use absolute path\n";
my $st=Donald::FileInfo->lstat($filename);
defined $st or return warn "$filename: $!\n";
my $rpc;
if ($st->type eq 'F') {
$rpc='filedata';
$st->size<=65536 or die "$filename: to big for broadcast (max 65536 bytes)\n";
if ($st->size==0) {
udp_broadcast_message($donald_s,$rpc,$st,0,'');
return;
}
my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n";
my $i=0;
for (my $pos=0;$pos<$st->size;$pos+=1024) {
my $data;
defined $fh->sysread($data,1024) or return warn "$filename: $!\n";
# warn "send bytes $pos to ",$pos+length($data),"\n";
udp_broadcast_message($donald_s,$rpc,$st,$pos,$data);
++$i % $BC_RATE or sleep 1;
}
} elsif ($st->type eq 'L') {
$rpc='filedata.2';
udp_broadcast_message($donald_s,$rpc,$st,0,'');
return;
} else {
die "file type not supported\n";
}
}
our %CMD=(
'mkmotd'=>'/usr/sbin/mkmotd.pl',
);
sub send_exec {
my ($donald_s,$cmd)=@_;
unless (exists $CMD{$cmd}) {
die "available commands: ",join(' , ',keys %CMD),"\n";
}
udp_broadcast_message($donald_s,'exec',$cmd);
}
sub udp_rx_exec {
my ($cmd)=@_;
warn "exec $cmd\n";
exists $CMD{$cmd} or return;
my $pid;
$pid=fork;
unless (defined $pid) {
warn "$!\n";
return;
}
unless ($pid) {
$pid=fork;
defined $pid or exit 1;
$pid and exit;
open STDIN,'<','/dev/null';
open STDOUT,'>','/dev/null';
open STDERR,'>','/dev/null';
alarm(60);
chdir '/';
exec '/bin/sh','-c',$CMD{$cmd};
exit 1;
}
wait;
}
#-------------------------------------------------------------
sub normalize_seg { # [pos,len],[pos,len],...
my @s=sort {$a->[0] <=> $b->[0]} @_;
my $i=0;
while ($i<$#s) {
# is element $i joinable with next element
my $end_0=$s[$i]->[0]+$s[$i]->[1];
if ($end_0 >= $s[$i+1]->[0] ) {
my $end_1=$s[$i+1]->[0]+$s[$i+1]->[1];
$s[$i]->[1] = ($end_0>$end_1 ? $end_0 : $end_1)-$s[$i]->[0];
splice @s,$i+1,1;
} else {
$i++;
}
}
return @s;
}
my %RECEIVER; # ( filename => $receiver, .... )
# $receiver : [ st_want , last_rx , io_handle , [ [pos,len] , [pos,len] , ... ] ]
sub purge_old_receiver {
while (my ($n,$v)=each %RECEIVER) {
if ($v->[1]+10<Donald::Select::time) {
warn 'timeout receiving ',$v->[0]->name,"\n";
log_to_stat_target('timeout receiving ',$v->[0]->name);
delete $RECEIVER{$n};
}
}
Donald::Select::timeout_requeue(60);
}
#-------------------------------------------------------------
our $INSTALLED_DIGEST='';
our $rx_filedata_done;
sub udp_rx_amdtardata {
my ($st_want,$pos,$data,$digest)=@_;
ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo';
### $digest eq $INSTALLED_DIGEST and $pos==0 and warn "/etc/amd - ",Digest::MD5::md5_hex($digest)," already installed\n";
$digest eq $INSTALLED_DIGEST and return;
#### $pos==0 and warn "receiving /etc/amd - ",Digest::MD5::md5_hex($digest),"\n";
udp_rx_filedata($st_want,$pos,$data);
if ($rx_filedata_done) {
my $pid=fork;
defined $pid or return warn "$!\n";
unless($pid) {
chdir '/etc/amd' or die "/etc/amd: $!\n";
exec 'tar','xzf',$st_want->name;
die "$!\n";
}
}
wait;
$? and return;
warn "installed /etc/amd - ",Digest::MD5::md5_hex($digest),"\n";
$INSTALLED_DIGEST=$digest;
system 'amq','-f';
system '/sbin/make-automaps';
}
our ($machine,$SYS_lchown,$SYS_mknod,$lmtime_sub);
chomp($machine=`uname -m`);
if ($machine eq 'i686') {
$SYS_lchown=198; # __NR_lchown32 in /usr/include/asm/unistd.h
$SYS_mknod=14; # __NR_mknod
} elsif ($machine eq 'x86_64') {
$SYS_lchown=94; # __NR_lchown in /usr/include/asm-x86_64/unistd.h
$SYS_mknod=133; # __NR_mknod
} elsif ($machine eq 'alpha') {
$SYS_lchown=208; # SYS_lchown in /usr/include/syscall.h
$SYS_mknod=14; # SYS_mknod
} elsif ($machine eq 'amd64') {
$SYS_lchown=254; # SYS_lchown in /usr/include/syscall.h
$SYS_mknod=14; # SYS_mknod
} else {
warn "unknown machine type $machine: symlink ownership can't be set.\n";
warn "unknown machine type $machine: named pipes,character and block devices can't be created\n";
}
if ($machine eq 'x86_64') {
our $SYS_utimensat=280; # /usr/include/asm/unistd_64.h
our $AT_FDCWD=-100; # /usr/include/fcntl.h
our $UTIME_OMIT=(1<<30)-2; # /usr/include/bits/stat.h
our $AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/fcntl.h
$lmtime_sub=sub {
my ($path,$mtime)=@_;
my $tsa=pack 'qqqq',0,$UTIME_OMIT,$mtime,0;
syscall($SYS_utimensat,$AT_FDCWD,$path,$tsa,$AT_SYMLINK_NOFOLLOW)==0 or return warn "$path: failed to lmtime: $!\n";
}
} else {
$lmtime_sub=sub {
my ($path,$mtime)=@_;
warn "$path: don't known how to change symlink mtime on target architecture\n";
}
}
sub lchown {
my ($uid,$gid,$path)=@_;
$SYS_lchown or return;
syscall($SYS_lchown,$path,$uid+0,$gid+0)==0 or return warn "$path: failed to lchown: $!\n";
}
sub lmtime {
my ($mtime,$path)=@_;
$lmtime_sub or return;
$lmtime_sub->($path,$mtime);
}
sub udp_rx_filedata {
# set rx_filedata_done as a side effect
my ($st_want,$pos,$data)=@_;
ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo';
my $filename=$st_want->name;
my $tmp_filename="$filename.tmp";
$rx_filedata_done=0;
my $st_is=Donald::FileInfo->lstat($st_want->name);
if ($st_is && $st_is->type eq 'F' && $st_is->size==$st_want->size && $st_is->mtime==$st_want->mtime) {
#### $pos==0 and warn " $filename seems to be current\n";
return;
}
if ($st_want->type eq 'L') {
if (!$st_is || $st_is->type ne 'L' || $st_is->target ne $st_want->target) {
$st_is and (unlink($filename) or return warn "$filename: failed to unlink: $!\n");
symlink($st_want->target,$filename) or return warn "$filename: failed to create symlink: $!\n";
lchown($st_want->uid,$st_want->gid,$filename);
lmtime($st_want->mtime,$filename);
warn "installed $filename -> ".$st_want->target."\n";
} else {
if ($st_is->uid != $st_want->uid || $st_is->gid != $st_want->gid) {
lchown($st_want->uid,$st_want->gid,$filename);
}
if ($st_is->mtime != $st_want->mtime) {
lmtime($st_want->mtime,$filename);
}
}
return;
}
if (length($data) == $st_want->size) {
# complete file in one broadcast
my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0);
defined $fh or return warn "$tmp_filename: $!\n";
$fh->syswrite($data);
$fh->close;
chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n";
chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n";
$st_is and unlink($filename);
rename($tmp_filename,$filename);
utime($st_want->mtime,$st_want->mtime,$filename);
warn "installed $filename\n";
$rx_filedata_done=1;
return;
}
length($data) or return; # shouldn't happen.
my $receiver=$RECEIVER{$st_want->name};
if (defined $receiver) {
if ( $receiver->[0]->size != $st_want->size or $receiver->[0]->mtime != $st_want->mtime ) {
$receiver=undef;
}
}
unless (defined $receiver) {
# create new receiver
## warn "start receiving $filename from $udp_peer_addr\n";
-e $tmp_filename and unlink($tmp_filename);
my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0);
defined $fh or return warn "$tmp_filename: $!\n";
$receiver = [$st_want,Donald::Select::time,$fh,[]];
$RECEIVER{$filename}=$receiver;
}
{
# warn "$filename: receive $pos length ",length($data),"\n";
# write data ( size cant be 0 here )
$receiver->[2]->seek($pos,0) or return warn "$tmp_filename: $!\n";
$receiver->[2]->syswrite($data) or return warn "$tmp_filename: $!\n";;
$receiver->[1]=Donald::Select::time;
my $s=$receiver->[3]=[normalize_seg(@{$receiver->[3]},[$pos,length($data)])];
#warn "$filename: receive $pos length ",length($data)," segments now: ",join(",",map( {'['.$_->[0].','.$_->[1].']'} @{$receiver->[3]})),"\n";
# all there ?
if (@$s == 1 && $s->[0]->[0]==0 && $s->[0]->[1]==$st_want->size) {
$receiver->[2]->close;
chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n";
chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n";
$st_is and unlink($filename);
rename($tmp_filename,$filename);
utime($st_want->mtime,$st_want->mtime,$filename);
warn "installed $filename\n";
delete $RECEIVER{$filename};
$rx_filedata_done=1;
}
}
}
#----------------------------------------------------------- sample running proc every 10 seconds
our @proc_running=(0)x12; # 12 samples every 5 seconds -> minute average
our @proc_running_10=(0)x10; # 10 samples every minute -> 10 minute average
our $SAMPLE_TICK=0;
sub xget_cur_running_proc {
open L,'<','/proc/stat' or return 0;
while (<L>) {
# warn $_;
return $1 if /^procs_running\s*(\d+)/;
}
return 0;
}
sub get_cur_running_proc {
our $CPUS;
my $r=xget_cur_running_proc();
if ($CPUS && $r>$CPUS+1) {
pload_debug("more than $CPUS+1 proc");
}
return $r;
}
sub running_proc {
my $ret=0;
$ret+=$_ for @proc_running;
return $ret/@proc_running;
}
sub running_proc_10 {
my $ret=0;
$ret+=$_ for @proc_running_10;
return $ret/@proc_running_10;
}
sub sample_rproc { # every 5 seconds
@proc_running=(@proc_running[1..@proc_running-1],get_cur_running_proc()-1);
if ($SAMPLE_TICK<12) {
$SAMPLE_TICK++;
} else {
@proc_running_10=(@proc_running_10[1..@proc_running_10-1],running_proc());
$SAMPLE_TICK=0;
}
Donald::Select::timeout_requeue(5);
}
#----------------------------------------------------------- stat
our ($CPUS,$BOGOMIPS);
sub init_cpuinfo {
Donald::Tools::is_alpha and return;
open L,'<','/proc/cpuinfo' or return;
while (<L>) {
if (/^bogomips\s*:\s*([\d.]+)/) {$CPUS++;$BOGOMIPS+=$1;}
}
}
sub loadavg { # AXP : (system load average) , LINUX: (system load average, pload, freebogo)
if (Donald::Tools::is_alpha) {
my $data=pack 'l!3lx4l!3';
my $i=syscall(85,3,0,$data,1,length($data)); # table(id=TBL_LOADAVG,index=0,addr=data,nel=1,lel=56)
my ($l0,$l1,$l2,$scale,$mach0,$mach1,$mach2)=unpack 'l!3lx4l!3',$data;
$scale or return undef;
return $l0/$scale;
} else {
$CPUS or init_cpuinfo();
my $running_proc=running_proc();
open L,'<','/proc/loadavg' or return undef;
my $data;
sysread(L,$data,1024);
close L;
$data =~ /^(\d+\.?\d*)/ or return undef; # 5 min loadavg
if ($CPUS) {
return ($1+0,($running_proc)/$CPUS,($running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS));
} else {
return $1+0;
}
}
}
our $STAT_TARGET='lol';
our $STAT_SEQ=0;
sub send_stat {
Donald::Select::timeout_requeue(600);
my ($load_avg,$pload,$pcapacity)=loadavg();
defined $load_avg or return;
udp_send_message($STAT_TARGET,'loadavg.2',$my_hostname,$STAT_SEQ++,$load_avg,version_info(),$pload,$pcapacity,$my_unixrev);
}
sub udp_rx_loadavg2 {
my ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev)=@_;
$my_hostname eq $STAT_TARGET and My::Cluster::Updown::rx_hostannounce($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev)
}
sub udp_rx_log {
my ($msg)=@_;
My::Cluster::Updown::msg_text($msg);
}
sub log_to_stat_target {
my ($msg)=join '',@_;
udp_send_message($STAT_TARGET,'log',"$my_hostname: $msg");
}
# ----------------------------------------------------------
sub udp_rx_restart {
# double-fork, because kill_previous_server() won't kill its parent
my $pid=fork;
if (defined $pid && $pid==0) {
my $pid2=fork;
if (defined $pid2 && $pid==0) {
exec '/usr/sbin/clusterd','--kill','--daemon';
die "exec failed: $!\n";
}
}
}
sub udp_rx_flush_gidcache {
if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') {
print $out time();
} else {
warn "proc/net/rpc/auth.unix.gid/flush: $!\n";
}
}
#----------- tcp mgmt console -----------------------------
our $MGMT_PORT=234;
our $mgmt_listen_socket;
our %mgmt_sockets;
sub mgmt_init {
$mgmt_listen_socket=new IO::Socket::INET(LocalPort=>$MGMT_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1);
defined $mgmt_listen_socket or die "$!\n";
Donald::Select::reader($mgmt_listen_socket,\&mgmt_connect_request);
}
sub mgmt_connect_request {
Donald::Select::reader_requeue();
# listen socket ready
my $socket=$mgmt_listen_socket->accept();
$socket->blocking(0);
my $peernode=$socket->peerhost;
### warn "accepted mgmt connection from $peernode\n";
Donald::Select::reader($socket,\&mgmt_receive,$socket);
$mgmt_sockets{$socket}=$socket;
$socket->print("clusterd ".version_info()." stupid console\n");
$socket->print("For historical messages, grep \"clusterd\" from /var/log/messages on $STAT_TARGET (or \"tail -f /var/log/messages |grep cluster\")\n");
}
sub mgmt_receive {
my ($s)=@_;
my $data;
my $len=$s->sysread($data,1024);
if (!defined $len or $len==0 or $len==1 && $data eq "\cD") {
delete $mgmt_sockets{$s};
$s->close;
###warn "lost connection to mgmt port\n";
return;
}
Donald::Select::reader_requeue();
$data=~s/\r?\n$//;
length $data or return;
if ($data eq 'l') {
$my_hostname eq $STAT_TARGET and My::Cluster::Updown::status($s);
$s->print("AREA: ",area_config_as_string(),"\n");
$s->print("STAT TARGET: ",$STAT_TARGET,"\n");
$s->print(' (',scalar(localtime),')',"\n");
}
elsif ($data eq 'r') {
for my $host (sort keys %H) {
if ($H{$host}->[0] eq 'UP') {
$s->printf("%-40s : %s\n",$host,$H{$host}->[7]);
}
}
} elsif ($data eq 'v') {
$Data::Dumper::Terse=1;
$Data::Dumper::Indent=0;
for (sort keys %H) {
$s->printf("%15s : %s\n",$_,Dumper($H{$_}));
}
} elsif ($data eq 'd') {
my $running_proc=running_proc();
my $running_proc_10=running_proc_10();
$s->printf("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc);
$s->printf("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10);
$CPUS or init_cpuinfo;
$CPUS and $s->printf("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS);
$CPUS and $s->printf("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS);
} elsif ($data =~ /^delete (\S+)$/) {
My::Cluster::Updown::delete_host($1);
} elsif ($data eq 'dup show') {
for my $h (sort {$DUPLICATES{$b} <=> $DUPLICATES{$a} } keys %DUPLICATES) {
$s->printf("%-10s : %5d\n",$h,$DUPLICATES{$h});
}
} elsif ($data eq 'dup clear') {
%DUPLICATES=();
} else {
$s->print(<<"_EOF_");
unknown command: $data
l : list status
v : dump status array
d : debug (cpu speed calc)
r : dump unix revisions
delete HOST : forget about HOST
dup show : show duplicates stat
dup clear : clear duplicates stat
to exit use ^D
_EOF_
}
}
sub mgmt_print_all {
my ($msg)=@_;
$options{'foreground'} and not $options{'syslog'} and print $msg;
for my $s (values(%mgmt_sockets)) {
$s->print($msg);
}
}
#-----------------------------------------------------------
sub clp_rx_LSOF {
my ($socket,$pattern)=@_;
my $pid=fork;
unless (defined $pid) {
warn"$!\n";
return;
}
unless ($pid) {
my $pid=fork;
defined $pid or die "$!\n";
unless ($pid) {
$socket->blocking(1);
open P,'lsof|' or die "$!\n";
while (<P>) {
next if defined $pattern && index($_,$pattern)<0;
$socket->send(pack('n',length($_)).$_,0);
}
close P;
close $socket;
exit;
}
exit;
}
close $socket;
wait;
return 1;
}
sub run_cmd {
my ($socket,@cmd)=@_;
my $pid=fork;
unless (defined $pid) {
warn"$!\n";
return;
}
unless ($pid) {
my $opipe=new IO::Pipe;
my $epipe=new IO::Pipe;
my $cpid=fork;
defined $cpid or die "$!\n";
unless ($cpid) {
warn "exec ".join(' ',@cmd)."\n";
$opipe->writer();
$epipe->writer();
open STDIN,'<','/dev/null';
open STDOUT,'>&',$opipe;
open STDERR,'>&',$epipe;
exec @cmd;
die "$!\n";
}
# $::SIG{'CHLD'}=sub {
# my $pid=wait;
# my $buffer="X$?";
# $socket->send(pack('n',length($buffer)).$buffer,0);
# exit;
# };
$opipe->reader();
$epipe->reader();
my $ofn=$opipe->fileno;
my $efn=$epipe->fileno;
my ($rvec_in,$wvec_in,$evec_in)=('','','');
vec($rvec_in,$ofn,1)=1;
vec($rvec_in,$efn,1)=1;
my $channel=2;
my $buffer;
$socket->blocking(1);
while (1) {
my ($rvec,$wvec,$evec)=($rvec_in,$wvec_in,$evec_in);
my $ready=select($rvec,$wvec,$evec,60);
$ready or die "timeout\n";
if (vec($rvec,$ofn,1)) {
my $len=$opipe->sysread($buffer,1024-1-2);
defined $len or die "$!\n";
if ($len) {
$socket->send(pack('n',$len+1)."O$buffer",0);
} else {
vec($rvec_in,$ofn,1)=0;
$opipe->close;
$channel--;
}
}
if (vec($rvec,$efn,1)) {
my $len=$epipe->sysread($buffer,1024-1-2);
defined $len or die "$!\n";
if ($len) {
$socket->send(pack('n',$len+1)."E$buffer",0);
} else {
vec($rvec_in,$efn,1)=0;
$epipe->close;
$channel--;
}
}
if ($channel==0) {
my $pid=wait;
my $buffer="X$?";
$socket->send(pack('n',length($buffer)).$buffer,0);
exit;
}
}
}
}
#----------- CLP cluster protocol -----------------------------
our $CLP_PORT=235;
our $clp_listen_socket;
our %clp_sockets;
our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF);
sub clp_init {
$clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1);
defined $clp_listen_socket or die "$!\n";
Donald::Select::reader($clp_listen_socket,\&clp_connect_request);
}
sub clp_connect_request {
Donald::Select::reader_requeue();
# listen socket ready
my $socket=$clp_listen_socket->accept();
$socket->blocking(0);
my $peernode=$socket->peerhost;
my $buffer='';
Donald::Select::reader($socket,\&clp_receive,$socket,\$buffer);
$clp_sockets{$socket}=$socket;
}
sub clp_receive {
my ($s,$bufref)=@_;
my $data;
defined $s->recv($data,$TCP_MAX) or return;
if (!length($data) ) {
delete $mgmt_sockets{$s};
$s->close;
return;
}
$$bufref.=$data;
while (1) {
last if length($$bufref)<2;
my $l=unpack('n',$$bufref);
last if length($$bufref)<2+$l;
my $msg=substr($$bufref,2,$l);
$$bufref=substr($$bufref,2+$l);
clp_message($s,$msg) and return;
}
Donald::Select::reader_requeue();
}
sub clp_message {
my ($socket,$data)=@_;
defined $CLUSTER_PW or return;
my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return;
$CLP_HANDLER{$handler_name}->($socket,@args) if exists $CLP_HANDLER{$handler_name};
}
sub clp_send_message { # clp_send_message($socket, @args)
my ($s,@args)=@_;
defined $CLUSTER_PW or return;
my $data=sign($CLUSTER_PW,encode(@args));
unless ($s->peername) {
return 0;
}
$s->send(pack('n',length($data)).$data);
}
sub clp_rx_CMD {
my ($socket,@args)=@_;
run_cmd($socket,@args);
close $socket;
return 1;
}
#----------------------------------------------------------
#our $CLUSTER_PW;
#our $CLUSTER_PW_FILE='/etc/clusterd.password';
#our $OLD_CLUSTER_PW_FILE='/root/clusterd.password';
#our $CLUSTER_PW_TIMESTAMP=0;
sub sync_cluster_pw {
my $st=Donald::FileInfo->lstat($CLUSTER_PW_FILE);
# upgrade : move cluster password file from /root to /etc
if (!$st && -e $OLD_CLUSTER_PW_FILE) {
warn "upgrading cluster password file location $OLD_CLUSTER_PW_FILE -> $CLUSTER_PW_FILE\n";
my $in=new IO::File $OLD_CLUSTER_PW_FILE,'<';
unless (defined $in) {warn "$OLD_CLUSTER_PW_FILE: $!\n";return undef;}
my $out=new IO::File $CLUSTER_PW_FILE,O_WRONLY|O_CREAT,0600;
unless (defined $out) {warn "$CLUSTER_PW_FILE: $!\n";return undef;}
my $data;
$in->read($data,1024);
$out->write($data);
$in->close;
$out->close;
$st=Donald::FileInfo->lstat($CLUSTER_PW_FILE);
defined $st or die "$CLUSTER_PW_FILE: $!\n";
unlink $OLD_CLUSTER_PW_FILE;
}
if ($st) {
if (!defined $CLUSTER_PW or $CLUSTER_PW_TIMESTAMP != $st->mtime) {
my $fh=new IO::File $CLUSTER_PW_FILE,'<';
if (defined ($fh)) {
defined $CLUSTER_PW and warn "update cluster password\n";
$fh->read($CLUSTER_PW,1024);
$CLUSTER_PW_TIMESTAMP=$st->mtime;
} else {
defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n";
$CLUSTER_PW=undef;
}
}
} else {
defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n";
$CLUSTER_PW=undef;
}
Donald::Select::timeout(60,\&sync_cluster_pw);
return defined $CLUSTER_PW;
}
#------------------------------------------------------------
# area routing
our %AREA_ROUTER=
(
lol => '141.14.31.255',
# orkrist=> '10.14.0.255',
);
our $area_socket;
sub init_area {
exists $AREA_ROUTER{$my_hostname} or return;
warn "I am area router for $AREA_ROUTER{$my_hostname}\n";
$area_socket=new IO::Socket::INET (Proto=>'udp',LocalPort=>$UDP_PORT+1) or die "$!\n";
Donald::Select::reader($area_socket,\&area_message,$area_socket);
}
sub area_message {
my ($area_socket)=@_;
my $data;
my $peer = $area_socket->recv($data,$UDP_MAX);
my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer);
my $udp_peer_addr=inet_ntoa($peer_iaddr);
$donald_s->send_data($AREA_ROUTER{$my_hostname},$UDP_PORT,$data); # broadcast to our network
Donald::Select::reader_requeue();
}
sub udp_broadcast_message {
my ($donald_s,@args)=@_;
defined $CLUSTER_PW or return;
my $data=sign($CLUSTER_PW,encode(@args));
for my $ip (keys %AREA_ROUTER) {
$donald_s->send_data($ip,$UDP_PORT+1,$data);
}
}
sub area_config_as_string {
return join(',',map({"$_ => $AREA_ROUTER{$_}"} keys %AREA_ROUTER))
}
#------------------------------------------------------------
our $PROG_FILE; # saved $0 - may be relative path
our $PROG_MTIME;
sub check_progfile_status {
defined $PROG_FILE or $PROG_FILE=$0;
my @f=lstat $PROG_FILE or return;
if (defined $PROG_MTIME) {
if ($f[9] != $PROG_MTIME) {
warn "progfile $PROG_FILE has changed - upgrade restart from version ".version_info()."\n";
exec $PROG_FILE,'--daemon',($options{'foreground'}?'--foreground':()),($options{'syslog'}?'--syslog':());
}
}
else {
$PROG_MTIME=$f[9];
}
Donald::Select::timeout(60,\&check_progfile_status);
}
sub version_info { # 'V1.31 - 20090617-155314'
my $t;
if (defined $PROG_MTIME) {
my @f=localtime($PROG_MTIME);
$t=sprintf '%4d%02d%02d-%02d%02d%02d',$f[5]+1900,$f[4]+1,$f[3],$f[2],$f[1],$f[0];
} else {
$t='?';
}
return "V$REVISION - ".$t;
}
#------------------------------------------------------------
sub pload_debug {
my ($why)=@_;
open T,'>','/var/log/pload_debug.txt' or return;
print T "$why at ".scalar(localtime),"\n";
my $running_proc=running_proc();
my $running_proc_10=running_proc_10();
printf T ("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc);
printf T ("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10);
$CPUS or init_cpuinfo;
$CPUS and printf T ("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS);
$CPUS and printf T ("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS);
system "uptime>>/var/log/pload_debug.txt";
system "ps -AlfT>>/var/log/pload_debug.txt";
system "cat /proc/stat>>/var/log/pload_debug.txt";
system "cat /proc/swaps>>/var/log/pload_debug.txt";
close T;
}
sub check_overload() {
Donald::Select::timeout_requeue(600);
if ($CPUS) {
my $running_proc_10=running_proc_10();
my $pload_10=running_proc_10()/$CPUS;
$pload_10>1.5 and warn (sprintf("pload>150%% (%3.1f%%)\n",$pload_10*100));
}
}
sub ping_mlx {
Donald::Select::timeout_requeue(86400);
system 'ping','-c','1','mlx';
if ($?) {
warn "failed to ping mlx\n";
}
}
#####################################################
my $slave=0;
my $exit_value=0;
sub expand_netgroup_hosts {
my (@netgroups)=@_;
my %DID=();
my @out=();
while (@netgroups) {
my $ng=pop @netgroups;
for my $entry (split ' ',`ypmatch $ng netgroup`) {
if ($entry=~/^\((\S+),.*,.*\)$/) {
push @out,$1;
} elsif ($entry=~/^[a-z0-9_]+$/i) {
push @netgroups,$entry;
} else {
warn "ignored entry $entry\n";
}
}
}
return @out;
}
sub exec_at {
my ($host,@cmd)=@_;
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
my $s=new IO::Socket::INET(PeerAddr=>$host,PeerPort=>$CLP_PORT);
unless (defined $s) {
die "$host: $!\n";
}
clp_send_message($s,'CMD',@cmd);
my $pbuffer='';
my $olbuffer='';
my $elbuffer='';
Donald::Select::reader($s,\&cmd_rx,$host,$s,\$pbuffer,\$olbuffer,\$elbuffer);
$slave=1;
Donald::Select::run() if $slave;;
}
sub lsof {
my ($pattern)=@_;
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
#for my $host ('theinternet') {
for my $host (sort(expand_netgroup_hosts('AMD_SHORT'))) {
next if $host eq 'tux';
my $s=new IO::Socket::INET (PeerAddr=>$host,PeerPort=>$CLP_PORT);
unless (defined $s) {
warn "$host: $!\n";
next;
}
clp_send_message($s,'LSOF',$pattern);
my $pbuffer='';
my $olbuffer='';
Donald::Select::reader($s,\&lsof_rx,$host,$s,\$pbuffer,\$olbuffer);
$slave++;
}
Donald::Select::run() if $slave;;
}
sub lsof_rx {
my ($host,$s,$pbufref,$olbufref)=@_;
my $data;
defined $s->recv($data,$TCP_MAX*2) or return;
if (!length($data)) {
close $s;
--$slave;
exit $exit_value unless $slave;
return;
}
$$pbufref.=$data;
while (1) {
last if length($$pbufref)<2;
my $l=unpack('n',$$pbufref);
last if length($$pbufref)<2+$l;
my $msg=substr($$pbufref,2,$l);
$$pbufref=substr($$pbufref,2+$l);
lsof_msg($host,$s,$msg,$olbufref);
}
Donald::Select::reader_requeue();
}
sub lsof_msg {
my ($host,$s,$data,$olbufref)=@_;
my $msg=$$olbufref.$data;
for ($msg=~/([^\n]*\n)/gs) {
print "$host $_";
}
($$olbufref)=$msg=~/([^\n]*)\z/;
}
sub cmd_rx {
my ($host,$s,$pbufref,$olbufref,$elbufref)=@_;
my $data;
defined $s->recv($data,$TCP_MAX*2) or return;
if (!length($data)) {
close $s;
--$slave;
exit $exit_value unless $slave;
return;
}
$$pbufref.=$data;
while (1) {
last if length($$pbufref)<2;
my $l=unpack('n',$$pbufref);
last if length($$pbufref)<2+$l;
my $msg=substr($$pbufref,2,$l);
$$pbufref=substr($$pbufref,2+$l);
cmd_msg($host,$s,$msg,$olbufref,$elbufref);
}
Donald::Select::reader_requeue();
}
sub cmd_msg {
my ($host,$s,$data,$olbufref,$elbufref)=@_;
my $channel=substr($data,0,1);
if ($channel eq 'X') {
my $rdata=substr($data,1);
$rdata!=0 and $exit_value=1;
return;
} elsif ($channel eq 'O') {
my $msg=$$olbufref.substr($data,1);
for ($msg=~/([^\n]*\n)/gs) {
print "$host: $_";
}
($$olbufref)=$msg=~/([^\n]*)\z/;
} elsif ($channel eq 'E') {
my $msg=$$elbufref.substr($data,1);
for ($msg=~/([^\n]*\n)/gs) {
print STDERR "$host: $_";
}
($$elbufref)=$msg=~/([^\n]*)\z/;
}
}
#------------------------------------------------------------
use constant USAGE => <<'__EOF__';
usage: $0 [options]
--push file # broadcast this file
--push-amd-tar # broadcast /etc/amd
--send-restart # broadcast a restart request to all nodes
--exec mkmotd # execute /usr/sbin/mkmotd.pl on all nodes
--exec @node cmd [args...] # execute cmd on node
--flush-gidcache # flush rpc auth.unix.gid cache on all nodes
--lsof=pattern
--kill # try to kill a running server
--daemon # start a daemon
--kill # try to kill previous server first
--foreground # stay in foreground, log to stderr
--syslog # log to syslog instead of stderr
__EOF__
use Getopt::Long;
GetOptions
(
'kill' => \$options{'kill'},
'daemon' => \$options{'daemon'},
'push=s' => \$options{'push'},
'exec=s' => \$options{'exec'},
'foreground' => \$options{'foreground'},
'syslog' => \$options{'syslog'},
'push-amd-tar' => \$options{'push_amd_tar'},
'send-restart' => \$options{'send-restart'},
'flush-gidcache' => \$options{'flush-gidcache'},
'lsof=s' => \$options{'lsof'},
) or die USAGE;
if (defined $options{'push'}) {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n";
push_file($donald_s,$options{'push'});
} elsif (defined $options{'exec'}) {
if (substr($options{'exec'},0,1) eq '@') {
length(length($options{'exec'})>1) or die USAGE;
@ARGV>=1 or die USAGE;
exec_at(substr($options{'exec'},1),@ARGV);
} else {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n";
send_exec($donald_s,$options{'exec'});
}
} elsif (defined $options{'push_amd_tar'}) {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n";
push_amd_tar($donald_s);
} elsif (defined $options{'send-restart'}) {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n";
udp_broadcast_message($donald_s,'restart');
} elsif (defined $options{'flush-gidcache'}) {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n";
udp_broadcast_message($donald_s,'flush-gidcache');
} elsif (defined $options{'daemon'}) {
$options{'kill'} and Donald::Tools::kill_previous_server('clusterd') and sleep 2;
$SIG{PIPE}='IGNORE';
$donald_s=new Donald::Select::INET(Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT) or die "$!\n";
$donald_s->receive_data(\&udp_message,$donald_s);
unless ($options{'foreground'}) {
my $pid=fork;
defined $pid or die "$!\n";
$pid and exit 0;
}
if ($options{'syslog'} or not $options{'foreground'}) {
openlog('clusterd','pid','daemon');
Sys::Syslog::setlogsock('unix'); # with 'native' we get EOLs in the logfile, option "noeol" doesn't work
$SIG{__WARN__} = sub { syslog('warning',@_); };
$SIG{__DIE__} = sub { syslog('crit',@_);syslog('crit','exiting');exit 1;};
open (STDOUT,'>','/dev/null');
open (STDERR,'>','/dev/null');
open (STDIN,'<','/dev/null');
}
check_progfile_status();
warn "server started - ".version_info()."\n";
init_area();
mgmt_init();
clp_init();
sync_cluster_pw() or warn "$CLUSTER_PW_FILE: $!\n";
Donald::Select::timeout(60,\&purge_old_receiver);
Donald::Select::timeout(rand(60),\&send_stat);
Donald::Select::timeout(0,\&sample_rproc) unless Donald::Tools::is_alpha;
$my_hostname eq $STAT_TARGET and My::Cluster::Updown::init();
$my_hostname eq 'lol' and My::NetlogReceiver::init();
Donald::Select::timeout(600,\&check_overload);
Donald::Select::timeout(30,\&ping_mlx);
Donald::Select::run();
} elsif ($options{'lsof'}) {
lsof($options{'lsof'});
} elsif ($options{'kill'}) {
Donald::Tools::kill_previous_server('clusterd');
} else {
die USAGE;
}