diff --git a/clusterd/clusterd b/clusterd/clusterd new file mode 100755 index 0000000..510cfb7 --- /dev/null +++ b/clusterd/clusterd @@ -0,0 +1,1941 @@ +#! /usr/bin/perl + +use warnings; +use strict; + +# https://github.molgen.mpg.de/mariux64/clusterd + +our $REVISION='1.110'; + +#use lib ('/home/buczek/cluster/Donald/blib/lib'); + +use Donald::Tools qw(encode sign check_sign decode); +use Donald::FileInfo; +use POSIX; +use IO::Pipe; +use Digest::MD5; + +#------------------------------------- +package My::Select; + +our $time=Donald::Tools::uptime(); + +sub My::Select::time +{ + return $time; +} + +our @TIMER=(); # ( [duetime,cb] , ... ) sorted by time +our $active_timer_cb; + +sub timeout # cb=Select::timeout(seconds,cb) +{ + my ($delta,$cb)=@_; + my $duetime=$time+$delta; + @TIMER=sort {$a->[0] <=> $b->[0]} ( [$duetime,$cb] , @TIMER ); + return $cb; +} + +sub timeout_cancel # cb=Select::timeout_cancel(cb) +{ + my ($cb)=@_; + @TIMER=grep {$_->[1] != $cb} @TIMER; + return $cb; +} + +sub timeout_requeue {timeout(shift,$active_timer_cb); } # Select::timeout_requeue(seconds) + +#------------------------------------ + +our @READER; # ( [Handle,cb] , ... ) +our @WRITER; +our @EXCEPT; +our $active_io; # [Handle,cb] + +sub reader # cb = Select::reader(Handle,cb) +{ + my ($handle,$cb)=@_; + push @READER,[$handle,$cb]; + return $cb; +} + +sub writer # cb = Select::writer(Handle,cb) +{ + my ($handle,$cb)=@_; + push @WRITER,[$handle,$cb]; + return $cb; +} + +sub except # cb = Select::except(Handle,cb) +{ + my ($handle,$cb)=@_; + push @EXCEPT,[$handle,$cb]; + return $cb; +} + +sub reader_requeue {push @READER,$active_io} +sub writer_requeue {push @WRITER,$active_io} +sub except_requeue {push @EXCEPT,$active_io} + +sub cancel # $cb = Select::cancel(cb) +{ + my ($cb)=@_; + defined $cb or $cb=$active_io->[1]; + @READER=grep {$_->[1] != $cb} @READER; + @WRITER=grep {$_->[1] != $cb} @WRITER; + @EXCEPT=grep {$_->[1] != $cb} @EXCEPT; +} + +sub cancel_handle +{ + my ($handle)=@_; + @READER=grep {$_->[0] != $handle} @READER; + @WRITER=grep {$_->[0] != $handle} @WRITER; + @EXCEPT=grep {$_->[0] != $handle} @EXCEPT; +} + +sub run +{ + while (1) { + $time=Donald::Tools::uptime(); + while (@TIMER && $TIMER[0]->[0]<=$time) { + $active_timer_cb=(shift @TIMER)->[1]; + $active_timer_cb->(); + } + $active_timer_cb=undef; + + my ($rvec,$wvec,$evec)=('','',''); + + for (@READER) { vec($rvec,$_->[0]->fileno,1)=1 } ; + for (@WRITER) { vec($wvec,$_->[0]->fileno,1)=1 } ; + for (@EXCEPT) { vec($evec,$_->[0]->fileno,1)=1 } ; + + my $ready=select($rvec,$wvec,$evec,1); + if ($ready>0) { + for (my $i=0;$i<@READER;$i++) { + if (vec($rvec,$READER[$i]->[0]->fileno,1)) { + $active_io=splice @READER,$i,1; + $active_io->[1]->(); + $active_io=undef; + last; + } + } + for (my $i=0;$i<@WRITER;$i++) { + if (vec($wvec,$WRITER[$i]->[0]->fileno,1)) { + $active_io=splice @WRITER,$i,1; + $active_io->[1]->(); + $active_io=undef; + last; + } + } + for (my $i=0;$i<@EXCEPT;$i++) { + if (vec($evec,$EXCEPT[$i]->[0]->fileno,1)) { + $active_io=splice @EXCEPT,$i,1; + $active_io->[1]->(); + $active_io=undef; + last; + } + } + } + } +} + +#-------------------------------------- +package My::Select::INET ; + +use Carp; +use IO::Socket::INET; + +our $UDP_MAX=1472; # for broadcast on alphas + +our $SOL_SOCKET=1; +our $SO_ERROR=4; + +sub get_socket_error +{ + my ($s)=@_; + return unpack('i',getsockopt($s,$SOL_SOCKET,$SO_ERROR)); +} + +sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) +{ + my ($class,@args)=@_; + our $socket=new IO::Socket::INET (@args) or return undef; + return bless \$socket,$class; +} + +sub send_data +{ + my ($self,$ip,$port,$data)=@_; + my $ip_address=inet_aton($ip); + unless (defined $ip_address) {carp("can't resolve $ip\n");return undef} + unless (length($data)<=$UDP_MAX) {carp("message to long\n");return undef} + $$self->send($data,0,pack_sockaddr_in($port,$ip_address)) or carp "$!\n"; +} + +sub reader +{ + my ($self,$sub)=@_; + My::Select::reader($$self,$sub); +} + +sub receive_data +{ + my ($self,$cb,@args)=@_; + + my $receive_data_cb=sub { + my $data; + my $peer = $$self->recv($data,$UDP_MAX); + my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); + my $udp_peer_addr=inet_ntoa($peer_iaddr); + My::Select::reader_requeue(); + $cb->($data,$udp_peer_addr,$udp_peer_port); + }; + My::Select::reader($$self,$receive_data_cb); +} + +# send_tcp($socket,$data,$timeout,$cb); +# +# send data asynchronously over noblocking tcp socket +# call callback when done ($!=0) or on error ($! set) +# +# all arguments required +# +sub send_tcp { + my ($s,$data,$timeout,$cb)=@_; + my $len=$s->send($data,0); + defined $len or return $cb->(); + if ($len==length($data)) { + $!=0; + $cb->(); + return; + } + my $pos=$len; + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(); + }; + my $cb_write=sub { + My::Select::timeout_cancel($cb_tmo); + my $len=send($s,substr($data,$pos),0); + defined $len or return $cb->(); + if ($len==length($data)-$pos) { + $!=0; + $cb->(); + return; + } + $pos+=$len; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer_requeue(); + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer($s,$cb_write); +} + +# $socket = connect_tcp ($ip,$port,$timeout,$cb) +# +# asynchronously connect to tcp socket. +# call callback when done or on error (with $! set) +# +# all arguments required +# +sub connect_tcp { + my ($ip,$port,$timeout,$cb)=@_; + + my $s=new IO::Socket::INET (PeerAddr=>$ip,PeerPort=>$port,Blocking=>0); + defined $s or return $cb->(); + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(); + }; + my $cb_write=sub { + My::Select::timeout_cancel($cb_tmo); + $!=get_socket_error($s); + $cb->(); + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer($s,$cb_write); + return $s; +} + +# read_with_timeout($socket,$callback,$timeout) +# +# asynchronously read from tcp socket. +# +sub read_with_timeout { + my ($s,$cb,$timeout)=@_; + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(undef); + }; + my $cb_read=sub { + My::Select::timeout_cancel($cb_tmo); + my $buf=''; + my $l=sysread($s,$buf,102400,0); + if (!defined $l) { + $cb->(undef); + } else { + $!=0; + $cb->($buf); + } + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::reader($s,$cb_read) +} + +#-------------------------------------- +package My::Cluster::Updown; + +# +# monitor nodes +# + +our %H; # ( name=> [state , last_seen , expect_seq , @data ] , ... ) +our $MONITOR_STARTING=1; + +our %DUPLICATES; + +# data currently: +# 0: float load-average +# 1: clusterd version +# 2: processor load +# 3: processor capacity +# 4: string unix version + +use Storable; + +sub save_state { + store \%H,'/root/clusterd.monitor.state'; +} + +sub restore_state { + -e '/root/clusterd.monitor.state' and %H=%{retrieve '/root/clusterd.monitor.state'}; + for (values %H) { + $_->[1]=0; + } +} + +sub status { + my ($f)=@_; + my (@up,@down); + for (sort keys %H) { push @up, $_ if $H{$_}->[0] eq 'UP' } + for (sort keys %H) { push @down,$_ if $H{$_}->[0] eq 'DOWN' } + + $f->printf("UP (%d): %s\n\n",scalar(@up),join(' ',@up)); + $f->printf("DOWN (%d): %s\n\n",scalar(@down),join(' ',@down)); + + my (@k,$max); + + @k=sort({$H{$b}->[3] <=> $H{$a}->[3]} grep({$H{$_}->[0] eq 'UP'} (keys %H))); + $max=0; + $f->print("TOP 10: "); + for (@k) { + $f->print(sprintf "%s (%4.2f) ",$_,$H{$_}->[3]); + last if $max++>=10; + } + $f->print("\n\n"); + + @k=sort({($H{$b}->[5]||0) <=> ($H{$a}->[5]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H))); + $max=0; + $f->print("TOP 10 CPU load : "); + for (@k) { + $f->print(sprintf "%s (%3.1f%%) ",$_,($H{$_}->[5]||0)*100); + last if $max++>=10; + } + $f->print("\n\n"); + + @k=sort({($H{$b}->[6]||0) <=> ($H{$a}->[6]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H))); + $max=0; + my $total_bogomips=0; + $f->print("TOP 10 free capacity : "); + for (@k) { + my $bogo=$H{$_}->[6]||0; + $f->print(sprintf "%s (%3.1f) ",$_,$bogo) if $max++<10; + $total_bogomips+=$bogo + } + $f->printf("\ntotal available bogomips: %d\n\n",$total_bogomips); +} + +sub msg_text { + my ($text)=@_; + warn($text."\n"); + my $time=scalar(localtime); + main::mgmt_print_all($time.': '.$text."\n"); +} + +sub msg_state { + my ($state_old,$state_new,$hostname,$extra)=@_; + msg_text(sprintf "%-4s -> %-4s %-20s %s",$state_old,$state_new,$hostname,$extra); +} + +sub init { + restore_state; + msg_text('node monitor: started. (recovery mode)'); + My::Select::timeout(630,sub{$MONITOR_STARTING=0;msg_text('node monitor: recovery finished');}); + My::Select::timeout(630,\&timeout_hosts); +} + +sub timeout_hosts { + My::Select::timeout_requeue(60); + my $timeout=My::Select::time()-1230; # 2x10 minutes + 30 seconds + + for (keys %H) { + my $h=$H{$_}; + if ($h->[0] eq 'UP' && $h->[1]<=$timeout) { + msg_state('UP','DOWN',$_,"timeout!"); + $h->[0]='DOWN'; + } + } + save_state(); +} + +sub rx_hostannounce { + + # ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev) + + my ($host,$seq,@more)=@_; + my ($load_average,$version,$pload,$pcapacity,$unixrev)=@more; + $unixrev ||= '?'; + unless (exists $H{$host}) { + if ($seq==0) { + msg_state('NEW','UP',$host,"discovered new node $version - $unixrev"); + } else { + $MONITOR_STARTING or msg_state('NEW','UP',$host,"discovered running node $version - $unixrev"); + } + $H{$host}=['UP',My::Select::time,$seq+1,@more]; + } else { + my $h=$H{$host}; + if ($h->[0] eq 'UP') { + if ($seq == $h->[2]) { + ; + } else { + if ($seq==0) { + # msg_state('UP','UP',$host,"node rebootet $version - $unixrev"); + + } elsif ($seq>$h->[2]) { + $MONITOR_STARTING or msg_state('UP','UP',$host,$seq-$h->[2]." packet(s) lost!"); + } elsif ($seq+1==$h->[2]) { + warn "DUPLICATE from $host\n"; + $DUPLICATES{$host}=($DUPLICATES{$host}||0)+1; + } else { + msg_state('UP','UP',$host,"node rebooted and ".$seq." packet(s) lost!"); + } + } + } else { + if ($seq==0) { + msg_state('DOWN','UP',$host,"node rebooted $version - $unixrev"); + } elsif ($seq==$h->[2]) { + msg_state('DOWN','UP',$host,"sequence error. Node to slow? (seq=$seq,exp=".$h->[2].")"); + } else { + msg_state('DOWN','UP',$host,$seq-$h->[2]." packet(s) lost"); + } + } + @$h=('UP',My::Select::time,$seq+1,@more); + } +} + +sub delete_host { + my ($host)=@_; + my $h=delete $H{$host} or return; + msg_text("host $host removed from monitor"); +} + +#----------------------------------------------------------------------- + +package My::NetlogReceiver; + +our $listen_socket; +our $TCP_MAX=1024; + +our $DAY_LAST_MSG; + +sub day { + my @f=localtime; + return sprintf "%04d%02d%02d",$f[5]+1900,$f[4]+1,$f[3]; +} + +sub bigben { + my $day=day(); + $day le $DAY_LAST_MSG and return; + warn "NETLOG ==================================================== morning has broken ====\n"; + $DAY_LAST_MSG=$day; +} + +sub bigben_timer { + bigben(); + My::Select::timeout_requeue(30); +} + +sub bigben_init { + $DAY_LAST_MSG=day(); + My::Select::timeout(30,\&bigben_timer); + +} + +sub receive { + my ($socket,$peernode,$bufref)=@_; + my $data; + + bigben(); + + defined $socket->recv($data,$TCP_MAX) or return; + # length $data or warn "$peernode: disconnect\n"; + length $data or return; + + $$bufref.=$data; + + while (1) { + last if length($$bufref)<2; + my $l=unpack('n',$$bufref); + # warn "wait for $l+2 bytes got ".length($$bufref)."\n"; + last if length($$bufref)<2+$l; + my $msg=substr($$bufref,2,$l); + $$bufref=substr($$bufref,2+$l); + $|=1; + warn "NETLOG $msg\n" unless $msg=~/NETLOG/; + } + My::Select::reader_requeue(); +} + +sub connect_request { + My::Select::reader_requeue(); + + my $socket=$listen_socket->accept(); + $socket->blocking(0); + + my $peernode=$socket->peerhost; + my $buffer=''; + My::Select::reader($socket,sub{receive($socket,$peernode,\$buffer)}); + # warn "$peernode: connect\n"; +} + +sub init { + $listen_socket=new IO::Socket::INET(Proto=>'tcp',LocalPort=>1028,Listen=>1,ReuseAddr=>1) or die "$!\n"; + My::Select::reader($listen_socket,\&connect_request); + bigben_init(); +} + +#------------------------------------------------------------------------ +package main; +use strict; +use IO::File; +use Sys::Syslog; +use IO::Socket::INET; +use Data::Dumper; + +our $UDP_MAX=1472; # for broadcast on alphas +our $UDP_PORT=234; +our $BC_RATE=8; # packets per second broadcast +our $TCP_TIMEOUT=30; # default timeout for tcp processing + +our (%options); # RUN OPTIONS + +our $donald_s; # My::Select::INET udp socket + +our $my_hostname; +our $my_ip; # '141.14.12.12' +$my_hostname=lc `/bin/hostname`; +chomp($my_hostname); +$my_hostname =~ s/\.molgen\.mpg\.de$//; + +while (1) { + my $addr=inet_aton($my_hostname); + if(defined $addr) { + $my_ip=inet_ntoa(inet_aton($my_hostname)); + } + last if defined $my_ip; + my $once; + unless ($once) { + warn "no IP (yet)= - waiting\n"; + $once++; + } + sleep 30; +} + +our $my_unixrev; +$my_unixrev=`uname -r`; +chomp($my_unixrev); + +our $CLUSTER_PW; +our $CLUSTER_PW_FILE='/etc/clusterd.password'; +our $OLD_CLUSTER_PW_FILE='/root/clusterd.password'; +our $CLUSTER_PW_TIMESTAMP=0; + +$ENV{'PATH'} = '/usr/local/bin:/sbin:/usr/sbin:/bin'.($ENV{PATH}?':'.$ENV{PATH}:''); # for ps , tar (gnu!) + +#---------------------------------------------------------- UDP + +our ($udp_peer_addr,$udp_peer_port); # ('141.14.12.12',1234) + +our %UDP_HANDLER= +( + 'filedata' => \&udp_rx_filedata, + 'filedata.2' => \&udp_rx_filedata, + 'amdtardata' => \&udp_rx_amdtardata, + 'loadavg.2' => \&udp_rx_loadavg2, + 'restart' => \&udp_rx_restart, + 'flush-gidcache' => \&udp_rx_flush_gidcache, + 'make-automaps' => \&udp_rx_make_automaps, + 'log' => \&udp_rx_log, + 'exec' => \&udp_rx_exec, + 'push' => \&udp_rx_push, +); + +sub udp_message { + my ($data,$x_udp_peer_addr,$x_udp_peer_port,$donald_s)=@_; + + ($udp_peer_addr,$udp_peer_port)=($x_udp_peer_addr,$x_udp_peer_port); + + defined $CLUSTER_PW or return; + + my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return; + $UDP_HANDLER{$handler_name}->(@args) if exists $UDP_HANDLER{$handler_name}; +} + +sub udp_send_message { # udp_send_message( dst, @args) # dst='141.14.31.255' 'zork' '141.14.16.1' etc. data is anything + my ($ip,@args)=@_; + defined $CLUSTER_PW or return; + $donald_s->send_data($ip,$UDP_PORT,sign($CLUSTER_PW,encode(@args))); +} + +#---------------------------------------------------------- + +sub push_amd_tar { + my ($donald_s)=@_; + my $filename='/tmp/amd.tar'; + + my $pid=fork; + defined $pid or return warn "$!\n"; + unless($pid) { + chdir '/etc/amd' or die "/etc/amd: $!\n"; + exec 'tar','cf',$filename,'.'; + die "$!\n"; + } + wait; + $? and return; + + my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; + my $digest=Digest::MD5->new->addfile($fh)->digest; + warn "tar digest is ",Digest::MD5::md5_hex($digest),"\n"; + + $pid=fork; + defined $pid or return warn "$!\n"; + unless($pid) { + exec 'gzip','-f',$filename; + die "$!\n"; + } + wait; + $? and return; + + $filename='/tmp/amd.tar.gz'; + + my $st=Donald::FileInfo->lstat($filename); + defined $st or return warn "$filename: $!\n"; + $st->type eq 'F' or return warn "$filename: not a plain file\n"; + + $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; + + my $i=0; + for (my $pos=0;$pos<$st->size;$pos+=1024) { + my $data; + defined $fh->sysread($data,1024) or return warn "$filename: $!\n"; + # warn "send bytes $pos to ",$pos+length($data),"\n"; + ++$i % $BC_RATE or sleep 1; + udp_broadcast_message($donald_s,'amdtardata',$st,$pos,$data,$digest); + } +} + +sub push_file { + my ($donald_s,$filename)=@_; + + $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; + + my $st=Donald::FileInfo->lstat($filename); + defined $st or return warn "$filename: $!\n"; + my $rpc; + if ($st->type eq 'F') { + $rpc='filedata'; + $st->size<=65536 or die "$filename: to big for broadcast (max 65536 bytes)\n"; + if ($st->size==0) { + udp_broadcast_message($donald_s,$rpc,$st,0,''); + return; + } + + my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; + my $i=0; + for (my $pos=0;$pos<$st->size;$pos+=1024) { + my $data; + defined $fh->sysread($data,1024) or return warn "$filename: $!\n"; + # warn "send bytes $pos to ",$pos+length($data),"\n"; + udp_broadcast_message($donald_s,$rpc,$st,$pos,$data); + ++$i % $BC_RATE or sleep 1; + } + } elsif ($st->type eq 'L') { + $rpc='filedata.2'; + udp_broadcast_message($donald_s,$rpc,$st,0,''); + return; + } else { + die "file type not supported\n"; + } + +} + +our %CMD=( + 'mkmotd'=>'/usr/sbin/mkmotd.pl', +); + +sub send_exec { + my ($donald_s,$cmd)=@_; + unless (exists $CMD{$cmd}) { + die "available commands: ",join(' , ',keys %CMD),"\n"; + } + udp_broadcast_message($donald_s,'exec',$cmd); +} + +sub udp_rx_exec { + my ($cmd)=@_; + + warn "exec $cmd\n"; + exists $CMD{$cmd} or return; + + my $pid; + $pid=fork; + unless (defined $pid) { + warn "$!\n"; + return; + } + unless ($pid) { + $pid=fork; + defined $pid or exit 1; + $pid and exit; + + open STDIN,'<','/dev/null'; + open STDOUT,'>','/dev/null'; + open STDERR,'>','/dev/null'; + alarm(60); + chdir '/'; + exec '/bin/sh','-c',$CMD{$cmd}; + exit 1; + } + wait; +} + +#------------------------------------------------------------- + +sub normalize_seg { # [pos,len],[pos,len],... + my @s=sort {$a->[0] <=> $b->[0]} @_; + + my $i=0; + while ($i<$#s) { + # is element $i joinable with next element + + my $end_0=$s[$i]->[0]+$s[$i]->[1]; + if ($end_0 >= $s[$i+1]->[0] ) { + my $end_1=$s[$i+1]->[0]+$s[$i+1]->[1]; + $s[$i]->[1] = ($end_0>$end_1 ? $end_0 : $end_1)-$s[$i]->[0]; + splice @s,$i+1,1; + } else { + $i++; + } + } + return @s; +} + +my %RECEIVER; # ( filename => $receiver, .... ) + +# $receiver : [ st_want , last_rx , io_handle , [ [pos,len] , [pos,len] , ... ] ] + +sub purge_old_receiver { + while (my ($n,$v)=each %RECEIVER) { + if ($v->[1]+10[0]->name,"\n"; + log_to_stat_target('timeout receiving ',$v->[0]->name); + delete $RECEIVER{$n}; + } + } + My::Select::timeout_requeue(60); +} + +#------------------------------------------------------------- + +our $INSTALLED_DIGEST=''; + +our $rx_filedata_done; + +sub udp_rx_amdtardata { + my ($st_want,$pos,$data,$digest)=@_; + + ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo'; + + ### $digest eq $INSTALLED_DIGEST and $pos==0 and warn "/etc/amd - ",Digest::MD5::md5_hex($digest)," already installed\n"; + $digest eq $INSTALLED_DIGEST and return; + + #### $pos==0 and warn "receiving /etc/amd - ",Digest::MD5::md5_hex($digest),"\n"; + + udp_rx_filedata($st_want,$pos,$data); + if ($rx_filedata_done) { + my $pid=fork; + defined $pid or return warn "$!\n"; + unless($pid) { + chdir '/etc/amd' or die "/etc/amd: $!\n"; + exec 'tar','xzf',$st_want->name; + die "$!\n"; + } + } + wait; + $? and return; + + warn "installed /etc/amd - ",Digest::MD5::md5_hex($digest),"\n"; + $INSTALLED_DIGEST=$digest; + system '/sbin/make-automaps'; +} + +our ($machine,$SYS_lchown,$SYS_mknod,$lmtime_sub); +our ($SYS_utimensat,$AT_FDCWD,$UTIME_OMIT,$AT_SYMLINK_NOFOLLOW); + +sub lmtime_unsupported { + my ($path,$mtime)=@_; + warn "$path: don't known how to change symlink mtime on target architecture\n"; +} +sub lmtime_utimensat { + my ($path,$mtime)=@_; + my $tsa=pack 'qqqq',0,$UTIME_OMIT,$mtime,0; + syscall($SYS_utimensat,$AT_FDCWD,$path,$tsa,$AT_SYMLINK_NOFOLLOW)==0 or return warn "$path: failed to lmtime: $!\n"; +} +$lmtime_sub=\&lmtime_unsupported; + +chomp($machine=`uname -m`); +if ($machine eq 'i686') { + $SYS_lchown=198; # __NR_lchown32 in /usr/include/asm/unistd.h + $SYS_mknod=14; # __NR_mknod +} elsif ($machine eq 'x86_64') { + $SYS_lchown=94; # __NR_lchown in /usr/include/asm-x86_64/unistd.h + $SYS_mknod=133; # __NR_mknod + + $SYS_utimensat=280; # /usr/include/asm/unistd_64.h + $AT_FDCWD=-100; # /usr/include/fcntl.h + $UTIME_OMIT=(1<<30)-2; # /usr/include/bits/stat.h + $AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/fcntl.h + $lmtime_sub=\&lmtime_utimensat; +} elsif ($machine eq 'alpha') { + $SYS_lchown=208; # SYS_lchown in /usr/include/syscall.h + $SYS_mknod=14; # SYS_mknod +} elsif ($machine eq 'amd64') { + $SYS_lchown=254; # SYS_lchown in /usr/include/syscall.h + $SYS_mknod=14; # SYS_mknod +} elsif ($machine eq 'ppc64le') { + $SYS_lchown=16; # __NR_lchown in /usr/include/powerpc64le-linux-gnu/asm/unistd.h + $SYS_mknod=14; # __NR_mknod + + $SYS_utimensat=304; # __NR_utimensat in /usr/include/powerpc64le-linux-gnu/asm/unistd.h + $AT_FDCWD=-100; # /usr/include/linux/fcntl.h + $UTIME_OMIT=(1<<30)-2; # /usr/include/powerpc64le-linux-gnu/bits/stat.h + $AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/linux/fcntl.h + $lmtime_sub=\&lmtime_utimensat; +} else { + warn "unknown machine type $machine: symlink ownership can't be set.\n"; + warn "unknown machine type $machine: named pipes,character and block devices can't be created\n"; +} + +sub lchown { + my ($uid,$gid,$path)=@_; + $SYS_lchown or return; + syscall($SYS_lchown,$path,$uid+0,$gid+0)==0 or return warn "$path: failed to lchown: $!\n"; +} + +sub lmtime { + my ($mtime,$path)=@_; + $lmtime_sub or return; + $lmtime_sub->($path,$mtime); +} + +sub udp_rx_filedata { + +# set rx_filedata_done as a side effect + + my ($st_want,$pos,$data)=@_; + + ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo'; + + my $filename=$st_want->name; + my $tmp_filename="$filename.tmp"; + + $rx_filedata_done=0; + + my $st_is=Donald::FileInfo->lstat($st_want->name); + if ($st_is && $st_is->type eq 'F' && $st_is->size==$st_want->size && $st_is->mtime==$st_want->mtime) { + #### $pos==0 and warn " $filename seems to be current\n"; + return; + } + + if ($st_want->type eq 'L') { + if (!$st_is || $st_is->type ne 'L' || $st_is->target ne $st_want->target) { + $st_is and (unlink($filename) or return warn "$filename: failed to unlink: $!\n"); + symlink($st_want->target,$filename) or return warn "$filename: failed to create symlink: $!\n"; + lchown($st_want->uid,$st_want->gid,$filename); + lmtime($st_want->mtime,$filename); + warn "installed $filename -> ".$st_want->target."\n"; + } else { + if ($st_is->uid != $st_want->uid || $st_is->gid != $st_want->gid) { + lchown($st_want->uid,$st_want->gid,$filename); + } + if ($st_is->mtime != $st_want->mtime) { + lmtime($st_want->mtime,$filename); + } + } + return; + } + + if (length($data) == $st_want->size) { + # complete file in one broadcast + my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0); + defined $fh or return warn "$tmp_filename: $!\n"; + $fh->syswrite($data) or return warn "$tmp_filename: $!\n"; + $fh->close or return warn "$tmp_filename: $!\n"; + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + $st_is and unlink($filename); + rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$filename); + warn "installed $filename\n"; + $rx_filedata_done=1; + return; + } + + length($data) or return; # shouldn't happen. + + my $receiver=$RECEIVER{$st_want->name}; + + if (defined $receiver) { + if ( $receiver->[0]->size != $st_want->size or $receiver->[0]->mtime != $st_want->mtime ) { + $receiver=undef; + } + } + + unless (defined $receiver) { + # create new receiver + ## warn "start receiving $filename from $udp_peer_addr\n"; + -e $tmp_filename and unlink($tmp_filename); + my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0); + defined $fh or return warn "$tmp_filename: $!\n"; + $receiver = [$st_want,My::Select::time,$fh,[]]; + $RECEIVER{$filename}=$receiver; + } + + { + # warn "$filename: receive $pos length ",length($data),"\n"; + + # write data ( size cant be 0 here ) + $receiver->[2]->seek($pos,0) or return warn "$tmp_filename: $!\n"; + $receiver->[2]->syswrite($data) or return warn "$tmp_filename: $!\n";; + $receiver->[1]=My::Select::time; + my $s=$receiver->[3]=[normalize_seg(@{$receiver->[3]},[$pos,length($data)])]; + + #warn "$filename: receive $pos length ",length($data)," segments now: ",join(",",map( {'['.$_->[0].','.$_->[1].']'} @{$receiver->[3]})),"\n"; + + # all there ? + + if (@$s == 1 && $s->[0]->[0]==0 && $s->[0]->[1]==$st_want->size) { + $receiver->[2]->close; + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + $st_is and unlink($filename); + rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$filename); + warn "installed $filename\n"; + delete $RECEIVER{$filename}; + $rx_filedata_done=1; + } + } +} + +#----------------------------------------------------------- sample running proc every 10 seconds + +our @proc_running=(0)x12; # 12 samples every 5 seconds -> minute average +our @proc_running_10=(0)x10; # 10 samples every minute -> 10 minute average +our $SAMPLE_TICK=0; + +sub xget_cur_running_proc { + open L,'<','/proc/stat' or return 0; + while () { + # warn $_; + return $1 if /^procs_running\s*(\d+)/; + } + return 0; +} + +sub get_cur_running_proc { + our $CPUS; + my $r=xget_cur_running_proc(); + if ($CPUS && $r>$CPUS+1) { + pload_debug("more than $CPUS+1 proc"); + } + return $r; +} + +sub running_proc { + my $ret=0; + $ret+=$_ for @proc_running; + return $ret/@proc_running; +} + +sub running_proc_10 { + my $ret=0; + $ret+=$_ for @proc_running_10; + return $ret/@proc_running_10; +} + +sub sample_rproc { # every 5 seconds + @proc_running=(@proc_running[1..@proc_running-1],get_cur_running_proc()-1); + if ($SAMPLE_TICK<12) { + $SAMPLE_TICK++; + } else { + @proc_running_10=(@proc_running_10[1..@proc_running_10-1],running_proc()); + $SAMPLE_TICK=0; + } + My::Select::timeout_requeue(5); +} + +#----------------------------------------------------------- stat + +our ($CPUS,$BOGOMIPS); + +sub init_cpuinfo { + Donald::Tools::is_alpha and return; + open L,'<','/proc/cpuinfo' or return; + while () { + if (/^bogomips\s*:\s*([\d.]+)/) {$CPUS++;$BOGOMIPS+=$1;} + } +} + +sub loadavg { # AXP : (system load average) , LINUX: (system load average, pload, freebogo) + if (Donald::Tools::is_alpha) { + my $data=pack 'l!3lx4l!3'; + my $i=syscall(85,3,0,$data,1,length($data)); # table(id=TBL_LOADAVG,index=0,addr=data,nel=1,lel=56) + my ($l0,$l1,$l2,$scale,$mach0,$mach1,$mach2)=unpack 'l!3lx4l!3',$data; + $scale or return undef; + return $l0/$scale; + } else { + $CPUS or init_cpuinfo(); + my $running_proc=running_proc(); + open L,'<','/proc/loadavg' or return undef; + my $data; + sysread(L,$data,1024); + close L; + $data =~ /^(\d+\.?\d*)/ or return undef; # 5 min loadavg + if ($CPUS) { + return ($1+0,($running_proc)/$CPUS,($running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS)); + } else { + return $1+0; + } + } +} + +our $STAT_TARGET='lol'; +our $STAT_SEQ=0; +sub send_stat { + My::Select::timeout_requeue(600); + my ($load_avg,$pload,$pcapacity)=loadavg(); + defined $load_avg or return; + udp_send_message($STAT_TARGET,'loadavg.2',$my_hostname,$STAT_SEQ++,$load_avg,version_info(),$pload,$pcapacity,$my_unixrev); +} + +sub udp_rx_loadavg2 { + my ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev)=@_; + $my_hostname eq $STAT_TARGET and My::Cluster::Updown::rx_hostannounce($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev) +} + +sub udp_rx_log { + my ($msg)=@_; + My::Cluster::Updown::msg_text($msg); +} + +sub log_to_stat_target { + my ($msg)=join '',@_; + udp_send_message($STAT_TARGET,'log',"$my_hostname: $msg"); +} + +# ---------------------------------------------------------- + +sub udp_rx_restart { + # double-fork, because kill_previous_server() won't kill its parent + my $pid=fork; + if (defined $pid && $pid==0) { + my $pid2=fork; + if (defined $pid2 && $pid==0) { + exec '/usr/sbin/clusterd','--kill','--daemon'; + die "exec failed: $!\n"; + } + } +} + +sub udp_rx_flush_gidcache { + if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') { + print $out time(); + } else { + warn "proc/net/rpc/auth.unix.gid/flush: $!\n"; + } +} + +sub udp_rx_make_automaps { + if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') { + print $out time(); + } else { + warn "proc/net/rpc/auth.unix.gid/flush: $!\n"; + } + system '/sbin/make-automaps'; +} + +#----------- tcp mgmt console ----------------------------- + +our $MGMT_PORT=234; +our $mgmt_listen_socket; + +our %mgmt_sockets; + +sub mgmt_init { + $mgmt_listen_socket=new IO::Socket::INET(LocalPort=>$MGMT_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1); + defined $mgmt_listen_socket or die "$!\n"; + My::Select::reader($mgmt_listen_socket,\&mgmt_connect_request); +} + +sub mgmt_connect_request { + + My::Select::reader_requeue(); + + # listen socket ready + + my $socket=$mgmt_listen_socket->accept(); + $socket->blocking(0); + my $peernode=$socket->peerhost; + + ### warn "accepted mgmt connection from $peernode\n"; + + My::Select::reader($socket,sub{mgmt_receive($socket)}); + $mgmt_sockets{$socket}=$socket; + $socket->print("clusterd ".version_info()." stupid console\n"); + $socket->print("For historical messages, grep \"clusterd\" from /var/log/messages on $STAT_TARGET (or \"tail -f /var/log/messages |grep cluster\")\n"); +} + +sub mgmt_receive { + my ($s)=@_; + my $data; + my $len=$s->sysread($data,1024); + if (!defined $len or $len==0 or $len==1 && $data eq "\cD") { + delete $mgmt_sockets{$s}; + $s->close; + ###warn "lost connection to mgmt port\n"; + return; + } + + My::Select::reader_requeue(); + $data=~s/\r?\n$//; + length $data or return; + if ($data eq 'l') { + $my_hostname eq $STAT_TARGET and My::Cluster::Updown::status($s); + $s->print("AREA: ",area_config_as_string(),"\n"); + $s->print("STAT TARGET: ",$STAT_TARGET,"\n"); + $s->print(' (',scalar(localtime),')',"\n"); + } + elsif ($data eq 'r') { + for my $host (sort keys %H) { + if ($H{$host}->[0] eq 'UP') { + $s->printf("%-40s : %s\n",$host,$H{$host}->[7]); + } + } + } elsif ($data eq 'v') { + $Data::Dumper::Terse=1; + $Data::Dumper::Indent=0; + for (sort keys %H) { + $s->printf("%15s : %s\n",$_,Dumper($H{$_})); + } + } elsif ($data eq 'd') { + my $running_proc=running_proc(); + my $running_proc_10=running_proc_10(); + $s->printf("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc); + $s->printf("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10); + $CPUS or init_cpuinfo; + $CPUS and $s->printf("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS); + $CPUS and $s->printf("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS); + } elsif ($data =~ /^delete (\S+)$/) { + My::Cluster::Updown::delete_host($1); + } elsif ($data eq 'dup show') { + for my $h (sort {$DUPLICATES{$b} <=> $DUPLICATES{$a} } keys %DUPLICATES) { + $s->printf("%-10s : %5d\n",$h,$DUPLICATES{$h}); + } + } elsif ($data eq 'dup clear') { + %DUPLICATES=(); + } else { + $s->print(<<"_EOF_"); +unknown command: $data + +l : list status +v : dump status array +d : debug (cpu speed calc) +r : dump unix revisions +delete HOST : forget about HOST + +dup show : show duplicates stat +dup clear : clear duplicates stat + +to exit use ^D + +_EOF_ + } +} + +sub mgmt_print_all { + my ($msg)=@_; + + $options{'foreground'} and not $options{'syslog'} and print $msg; + + for my $s (values(%mgmt_sockets)) { + $s->print($msg); + } +} + +#----------------------------------------------------------- + +sub clp_rx_LSOF { + my ($socket,$pattern)=@_; + + my $pid=fork; + unless (defined $pid) { + warn"$!\n"; + return; + } + unless ($pid) { + my $pid=fork; + defined $pid or die "$!\n"; + unless ($pid) { + $socket->blocking(1); + open P,'timeout -k 32s 30s lsof -n|' or die "$!\n"; + while (

) { + next if defined $pattern && index($_,$pattern)<0; + $socket->send(pack('n',length($_)).$_,0); + } + close P; + if ($?) { + $_=sprintf("** lsof timout/error on %s\n",$my_hostname); + $socket->send(pack('n',length($_)).$_,0); + } + close $socket; + exit; + } + exit; + } + close $socket; + wait; + return 1; +} + +sub run_cmd { + my ($socket,@cmd)=@_; + my $pid=fork; + unless (defined $pid) { + warn"$!\n"; + return; + } + unless ($pid) { + my $opipe=new IO::Pipe; + my $epipe=new IO::Pipe; + my $cpid=fork; + defined $cpid or die "$!\n"; + unless ($cpid) { + warn "exec ".join(' ',@cmd)."\n"; + $opipe->writer(); + $epipe->writer(); + open STDIN,'<','/dev/null'; + open STDOUT,'>&',$opipe; + open STDERR,'>&',$epipe; + exec @cmd; + die "$!\n"; + } +# $::SIG{'CHLD'}=sub { +# my $pid=wait; +# my $buffer="X$?"; +# $socket->send(pack('n',length($buffer)).$buffer,0); +# exit; +# }; + $opipe->reader(); + $epipe->reader(); + my $ofn=$opipe->fileno; + my $efn=$epipe->fileno; + my ($rvec_in,$wvec_in,$evec_in)=('','',''); + vec($rvec_in,$ofn,1)=1; + vec($rvec_in,$efn,1)=1; + my $channel=2; + my $buffer; + $socket->blocking(1); + while (1) { + my ($rvec,$wvec,$evec)=($rvec_in,$wvec_in,$evec_in); + my $ready=select($rvec,$wvec,$evec,60); + $ready or die "timeout\n"; + if (vec($rvec,$ofn,1)) { + my $len=$opipe->sysread($buffer,1024-1-2); + defined $len or die "$!\n"; + if ($len) { + $socket->send(pack('n',$len+1)."O$buffer",0); + } else { + vec($rvec_in,$ofn,1)=0; + $opipe->close; + $channel--; + } + } + if (vec($rvec,$efn,1)) { + my $len=$epipe->sysread($buffer,1024-1-2); + defined $len or die "$!\n"; + if ($len) { + $socket->send(pack('n',$len+1)."E$buffer",0); + } else { + vec($rvec_in,$efn,1)=0; + $epipe->close; + $channel--; + } + } + if ($channel==0) { + my $pid=wait; + my $buffer="X$?"; + $socket->send(pack('n',length($buffer)).$buffer,0); + exit; + } + } + } +} + +#----------- CLP cluster protocol ----------------------------- + +our $CLP_PORT=235; +our $clp_listen_socket; + +our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF,'PULL'=>\&clp_rx_PULL); + +sub clp_init { + $clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>128,ReuseAddr=>1); + defined $clp_listen_socket or die "$!\n"; + My::Select::reader($clp_listen_socket,\&clp_connect_request); +} + +sub clp_connect_request { + + My::Select::reader_requeue(); + + # listen socket ready + + my $socket=$clp_listen_socket->accept(); + $socket->blocking(0); + my $peernode=$socket->peerhost; + + my $buffer=''; + + My::Select::reader($socket,sub{clp_receive($socket,\$buffer)}); +} + +sub clp_receive { + my ($s,$bufref)=@_; + my $data; + defined $s->recv($data,$TCP_MAX) or return; + if (!length($data) ) { + $s->close; + return; + } + $$bufref.=$data; + while (1) { + last if length($$bufref)<2; + my $l=unpack('n',$$bufref); + last if length($$bufref)<2+$l; + my $msg=substr($$bufref,2,$l); + $$bufref=substr($$bufref,2+$l); + clp_message($s,$msg) and return; + } + My::Select::reader_requeue(); +} + +sub clp_message { + my ($socket,$data)=@_; + + defined $CLUSTER_PW or return; + + my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return; + $CLP_HANDLER{$handler_name}->($socket,@args) if exists $CLP_HANDLER{$handler_name}; +} + +sub clp_send_message { # clp_send_message($socket, @args) + my ($s,@args)=@_; + defined $CLUSTER_PW or return; + my $data=sign($CLUSTER_PW,encode(@args)); + unless ($s->peername) { + return 0; + } + $s->send(pack('n',length($data)).$data); +} + +sub clp_rx_CMD { + my ($socket,@args)=@_; + run_cmd($socket,@args); + close $socket; + return 1; +} + +# send_tcp_cp($socket,$cb,$timeout,@args) +# +# send a cluster protocoll message over an async tcp socket. +# +# assume $CLUSTER_PW is valid +# +sub send_tcp_cp { + my ($s,$cb,$timeout,@args)=@_; + my $data=sign($CLUSTER_PW,encode(@args)); + My::Select::INET::send_tcp($s,pack('n',length($data)).$data,$timeout,$cb); +} +#---------------------------------------------------------- + +#our $CLUSTER_PW; +#our $CLUSTER_PW_FILE='/etc/clusterd.password'; +#our $OLD_CLUSTER_PW_FILE='/root/clusterd.password'; +#our $CLUSTER_PW_TIMESTAMP=0; + +sub sync_cluster_pw { + my $st=Donald::FileInfo->lstat($CLUSTER_PW_FILE); + + # upgrade : move cluster password file from /root to /etc + + if (!$st && -e $OLD_CLUSTER_PW_FILE) { + warn "upgrading cluster password file location $OLD_CLUSTER_PW_FILE -> $CLUSTER_PW_FILE\n"; + my $in=new IO::File $OLD_CLUSTER_PW_FILE,'<'; + unless (defined $in) {warn "$OLD_CLUSTER_PW_FILE: $!\n";return undef;} + my $out=new IO::File $CLUSTER_PW_FILE,O_WRONLY|O_CREAT,0600; + unless (defined $out) {warn "$CLUSTER_PW_FILE: $!\n";return undef;} + my $data; + $in->read($data,1024); + $out->write($data); + $in->close; + $out->close; + $st=Donald::FileInfo->lstat($CLUSTER_PW_FILE); + defined $st or die "$CLUSTER_PW_FILE: $!\n"; + unlink $OLD_CLUSTER_PW_FILE; + } + + if ($st) { + if (!defined $CLUSTER_PW or $CLUSTER_PW_TIMESTAMP != $st->mtime) { + my $fh=new IO::File $CLUSTER_PW_FILE,'<'; + if (defined ($fh)) { + defined $CLUSTER_PW and warn "update cluster password\n"; + $fh->read($CLUSTER_PW,1024); + $CLUSTER_PW_TIMESTAMP=$st->mtime; + } else { + defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n"; + $CLUSTER_PW=undef; + } + } + } else { + defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n"; + $CLUSTER_PW=undef; + } + My::Select::timeout(60,\&sync_cluster_pw); + return defined $CLUSTER_PW; +} + +#------------------------------------------------------------ + +# area routing + +our %AREA_ROUTER= +( +lol => '141.14.31.255', +# orkrist=> '10.14.0.255', +); + +our $area_socket; + +sub init_area { + exists $AREA_ROUTER{$my_hostname} or return; + warn "I am area router for $AREA_ROUTER{$my_hostname}\n"; + + $area_socket=new IO::Socket::INET (Proto=>'udp',LocalPort=>$UDP_PORT+1) or die "$!\n"; + My::Select::reader($area_socket,sub{area_message($area_socket)}); +} + +sub area_message { + my ($area_socket)=@_; + my $data; + my $peer = $area_socket->recv($data,$UDP_MAX); + my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); + my $udp_peer_addr=inet_ntoa($peer_iaddr); + + $donald_s->send_data($AREA_ROUTER{$my_hostname},$UDP_PORT,$data); # broadcast to our network + My::Select::reader_requeue(); +} + +sub udp_broadcast_message { + my ($donald_s,@args)=@_; + + defined $CLUSTER_PW or return; + my $data=sign($CLUSTER_PW,encode(@args)); + for my $ip (keys %AREA_ROUTER) { + $donald_s->send_data($ip,$UDP_PORT+1,$data); + } +} + +sub area_config_as_string { + return join(',',map({"$_ => $AREA_ROUTER{$_}"} keys %AREA_ROUTER)) +} + +#------------------------------------------------------------ + +our $PROG_FILE; # saved $0 - may be relative path +our $PROG_MTIME; + +sub check_progfile_status { + defined $PROG_FILE or $PROG_FILE=$0; + my @f=lstat $PROG_FILE or return; + if (defined $PROG_MTIME) { + if ($f[9] != $PROG_MTIME) { + warn "progfile $PROG_FILE has changed - upgrade restart from version ".version_info()."\n"; + exec $PROG_FILE,'--daemon',($options{'foreground'}?'--foreground':()),($options{'syslog'}?'--syslog':()); + } + } + else { + $PROG_MTIME=$f[9]; + } + My::Select::timeout(60,\&check_progfile_status); +} + +sub version_info { # 'V1.31 - 20090617-155314' + my $t; + if (defined $PROG_MTIME) { + my @f=localtime($PROG_MTIME); + $t=sprintf '%4d%02d%02d-%02d%02d%02d',$f[5]+1900,$f[4]+1,$f[3],$f[2],$f[1],$f[0]; + } else { + $t='?'; + } + return "V$REVISION - ".$t; +} + +#------------------------------------------------------------ + +sub pload_debug { + my ($why)=@_; + open T,'>','/var/log/pload_debug.txt' or return; + print T "$why at ".scalar(localtime),"\n"; + my $running_proc=running_proc(); + my $running_proc_10=running_proc_10(); + printf T ("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc); + printf T ("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10); + $CPUS or init_cpuinfo; + $CPUS and printf T ("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS); + $CPUS and printf T ("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS); + system "uptime>>/var/log/pload_debug.txt"; + system "ps -AlfT>>/var/log/pload_debug.txt"; + system "cat /proc/stat>>/var/log/pload_debug.txt"; + system "cat /proc/swaps>>/var/log/pload_debug.txt"; + close T; +} + +sub check_overload() { + My::Select::timeout_requeue(600); + if ($CPUS) { + my $running_proc_10=running_proc_10(); + my $pload_10=running_proc_10()/$CPUS; + $pload_10>1.5 and warn (sprintf("pload>150%% (%3.1f%%)\n",$pload_10*100)); + } +} + +##################################################### + +my $slave=0; +my $exit_value=0; + +sub expand_netgroup_hosts { + my (@netgroups)=@_; + my %DID=(); + my @out=(); + + while (@netgroups) { + my $ng=pop @netgroups; + for my $entry (split ' ',`ypmatch $ng netgroup`) { + if ($entry=~/^\((\S+),.*,.*\)$/) { + push @out,$1; + } elsif ($entry=~/^[a-z0-9_]+$/i) { + push @netgroups,$entry; + } else { + warn "ignored entry $entry\n"; + } + } + } + return @out; +} + +sub exec_at { + my ($host,@cmd)=@_; + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + my $s=new IO::Socket::INET(PeerAddr=>$host,PeerPort=>$CLP_PORT); + unless (defined $s) { + die "$host: $!\n"; + } + clp_send_message($s,'CMD',@cmd); + my $pbuffer=''; + my $olbuffer=''; + my $elbuffer=''; + My::Select::reader($s,sub{cmd_rx($host,$s,\$pbuffer,\$olbuffer,\$elbuffer)}); + $slave=1; + My::Select::run() if $slave;; +} + +sub lsof { + my ($pattern)=@_; + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + #for my $host ('theinternet') { + for my $host (sort(expand_netgroup_hosts('AMD_SHORT'))) { + next if $host eq 'tux'; + my $s=new IO::Socket::INET (PeerAddr=>$host,PeerPort=>$CLP_PORT); + unless (defined $s) { + warn "$host: $!\n"; + next; + } + clp_send_message($s,'LSOF',$pattern); + my $pbuffer=''; + my $olbuffer=''; + My::Select::reader($s,sub{lsof_rx($host,$s,\$pbuffer,\$olbuffer)}); + $slave++; + } + My::Select::run() if $slave;; +} + +sub lsof_rx { + my ($host,$s,$pbufref,$olbufref)=@_; + my $data; + defined $s->recv($data,$TCP_MAX*2) or return; + if (!length($data)) { + close $s; + --$slave; + exit $exit_value unless $slave; + return; + } + $$pbufref.=$data; + while (1) { + last if length($$pbufref)<2; + my $l=unpack('n',$$pbufref); + last if length($$pbufref)<2+$l; + my $msg=substr($$pbufref,2,$l); + $$pbufref=substr($$pbufref,2+$l); + lsof_msg($host,$s,$msg,$olbufref); + } + My::Select::reader_requeue(); +} + +sub lsof_msg { + my ($host,$s,$data,$olbufref)=@_; + my $msg=$$olbufref.$data; + for ($msg=~/([^\n]*\n)/gs) { + print "$host $_"; + } + ($$olbufref)=$msg=~/([^\n]*)\z/; +} + +sub cmd_rx { + my ($host,$s,$pbufref,$olbufref,$elbufref)=@_; + my $data; + defined $s->recv($data,$TCP_MAX*2) or return; + if (!length($data)) { + close $s; + --$slave; + exit $exit_value unless $slave; + return; + } + $$pbufref.=$data; + while (1) { + last if length($$pbufref)<2; + my $l=unpack('n',$$pbufref); + last if length($$pbufref)<2+$l; + my $msg=substr($$pbufref,2,$l); + $$pbufref=substr($$pbufref,2+$l); + cmd_msg($host,$s,$msg,$olbufref,$elbufref); + } + My::Select::reader_requeue(); +} + +sub cmd_msg { + my ($host,$s,$data,$olbufref,$elbufref)=@_; + + my $channel=substr($data,0,1); + if ($channel eq 'X') { + my $rdata=substr($data,1); + $rdata!=0 and $exit_value=1; + return; + } elsif ($channel eq 'O') { + my $msg=$$olbufref.substr($data,1); + for ($msg=~/([^\n]*\n)/gs) { + print "$host: $_"; + } + ($$olbufref)=$msg=~/([^\n]*)\z/; + } elsif ($channel eq 'E') { + my $msg=$$elbufref.substr($data,1); + for ($msg=~/([^\n]*\n)/gs) { + print STDERR "$host: $_"; + } + ($$elbufref)=$msg=~/([^\n]*)\z/; + } +} + +our $SYS_SENDFILE=40; # /usr/include/asm/unistd_64.h + +sub sendfile { + my ($out_fd,$in_fd,$offset,$count)=@_; + my $ret=syscall($SYS_SENDFILE,fileno($out_fd),fileno($in_fd),$offset,$count); + return $ret<0 ? undef : $ret; +} + +sub clp_rx_PULL { + my ($s,$st_want)=@_; + + my $st_is=Donald::FileInfo->lstat($st_want->name); + if (!defined $st_is or $st_is->type ne 'F' or $st_is->size != $st_want->size or $st_is->mtime != $st_want->mtime) { + warn $st_want->name." requested by ".$s->peerhost.": no longer available\n"; + return; + } + my $fh; + unless (open $fh,'<',$st_want->name) { + warn $st_want->name.": $!\n"; + return; + } + my $bytes=$st_is->size; + my $cb_tmo=sub { My::Select::cancel_handle($s); }; + my $cb_write=sub { + my $l=sendfile($s,$fh,0,$bytes); + unless (defined $l) { + warn "$!"; + return; + } + $bytes-=$l; + if ($bytes) { + My::Select::writer_requeue if $bytes; + } else { + close($s); + } + }; + My::Select::timeout(5,$cb_tmo); + My::Select::writer($s,$cb_write); +} + +sub udp_rx_push { + my ($ip,$st_want)=@_; + + my $filename=$st_want->name; + my $tmp_filename="$filename.tmp"; + + $ip eq $my_ip and return; + + my $st_is=Donald::FileInfo->lstat($st_want->name); + + unless ($st_want->type eq 'F') { + warn "$filename: type ".$st_want->type." not yet implemented\n"; + return; + } + + if ($st_is + && $st_is->type eq 'F' + && $st_is->size == $st_want->size + && $st_is->mtime == $st_want->mtime + && $st_is->uid == $st_want->uid + && $st_is->gid == $st_want->gid + && $st_is->perm == $st_want->perm + ) { + warn "$filename: already okay\n"; + return; + } + + if ($st_want->size==0) { + my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0); + defined $fh or return warn "$tmp_filename: $!\n"; + $fh->close or return warn "$tmp_filename: $!\n"; + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n";; + utime($st_want->mtime,$st_want->mtime,$filename); + warn "installed (empty) $filename\n"; + return; + } + + my $s; + $s=My::Select::INET::connect_tcp($ip,$CLP_PORT,$TCP_TIMEOUT,sub { + $! and return warn "$ip: $!\n"; + send_tcp_cp($s,sub { + $! and return warn "$ip: $!\n"; + my $fh = IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0); + defined $fh or return warn "$tmp_filename: $!\n"; + + my $cb; + my $bytes=$st_want->size; + $cb=sub { + # note, we need to break the circular references $cb of our caller, if no longer needed + my ($buf)=@_; + if ($!) { warn "$ip: $!\n";$cb=undef;return; } + if (length($buf)==0) { warn "$ip: EOF\n";$cb=undef;return;} + print $fh $buf or return warn "$tmp_filename: $!\n"; + $bytes-=length($buf); + if ($bytes>0) { + My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); + return; + } + $cb=undef; + close $fh or return warn "$tmp_filename: $!\n"; + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$filename); + warn "installed $filename\n"; + }; + My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); + },$TCP_TIMEOUT,'PULL',$st_want); + }); +} + +sub cmd_push { + my @files=@_; + for my $filename (@files) { + $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; + -e $filename or die "$filename: no such file\n"; + } + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + for my $filename (@files) { + my $st=Donald::FileInfo->lstat($filename); + defined $st or die "$filename: $!\n"; + $st->type eq 'F' or die "$filename: only plain files currently supported\n"; + open my $test,'<',$filename or die "$filename: $!\n"; + udp_broadcast_message($donald_s,'push',$my_ip,$st); + } +} + +#------------------------------------------------------------ + +use constant USAGE => <<'__EOF__'; + +usage: $0 [options] + + --push file # broadcast this file + --push-amd-tar # broadcast /etc/amd + --send-restart # broadcast a restart request to all nodes + --exec mkmotd # execute /usr/sbin/mkmotd.pl on all nodes + --exec @node cmd [args...] # execute cmd on node + --flush-gidcache # flush rpc auth.unix.gid cache on all nodes + --make-automaps # execute /usr/sbin/make-automaps on all nodes + + --lsof=pattern + + --kill # try to kill a running server + + --daemon # start a daemon + --kill # try to kill previous server first + --foreground # stay in foreground, log to stderr + --syslog # log to syslog instead of stderr + + push files.... # push files over tcp + +__EOF__ + +use Getopt::Long; +GetOptions +( + 'kill' => \$options{'kill'}, + 'daemon' => \$options{'daemon'}, + 'push=s' => \$options{'push'}, + 'exec=s' => \$options{'exec'}, + 'foreground' => \$options{'foreground'}, + 'syslog' => \$options{'syslog'}, + 'push-amd-tar' => \$options{'push_amd_tar'}, + 'send-restart' => \$options{'send-restart'}, + 'flush-gidcache' => \$options{'flush-gidcache'}, + 'make-automaps' => \$options{'make-automaps'}, + 'lsof=s' => \$options{'lsof'}, + +) or die USAGE; + +if (defined $options{'push'}) { + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + push_file($donald_s,$options{'push'}); +} elsif (defined $options{'exec'}) { + if (substr($options{'exec'},0,1) eq '@') { + length(length($options{'exec'})>1) or die USAGE; + @ARGV>=1 or die USAGE; + exec_at(substr($options{'exec'},1),@ARGV); + } else { + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + send_exec($donald_s,$options{'exec'}); + } +} elsif (defined $options{'push_amd_tar'}) { + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + push_amd_tar($donald_s); +} elsif (defined $options{'send-restart'}) { + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s,'restart'); +} elsif (defined $options{'flush-gidcache'}) { + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s,'flush-gidcache'); +} elsif (defined $options{'make-automaps'}) { + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s,'make-automaps'); +} elsif (defined $options{'daemon'}) { + $options{'kill'} and Donald::Tools::kill_previous_server('clusterd') and sleep 2; + + $SIG{PIPE}='IGNORE'; + + $donald_s=new My::Select::INET(Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT) or die "$!\n"; + $donald_s->receive_data(\&udp_message,$donald_s); + + unless ($options{'foreground'}) { + my $pid=fork; + defined $pid or die "$!\n"; + $pid and exit 0; + } + + if ($options{'syslog'} or not $options{'foreground'}) { + openlog('clusterd','pid','daemon'); + Sys::Syslog::setlogsock('unix'); # with 'native' we get EOLs in the logfile, option "noeol" doesn't work + $SIG{__WARN__} = sub { syslog('warning',@_); }; + $SIG{__DIE__} = sub { syslog('crit',@_);syslog('crit','exiting');exit 1;}; + open (STDOUT,'>','/dev/null'); + open (STDERR,'>','/dev/null'); + open (STDIN,'<','/dev/null'); + } + + check_progfile_status(); + warn "server started - ".version_info()."\n"; + init_area(); + mgmt_init(); + clp_init(); + + sync_cluster_pw() or warn "$CLUSTER_PW_FILE: $!\n"; + + My::Select::timeout(60,\&purge_old_receiver); + My::Select::timeout(rand(60),\&send_stat); + My::Select::timeout(0,\&sample_rproc) unless Donald::Tools::is_alpha; + $my_hostname eq $STAT_TARGET and My::Cluster::Updown::init(); + $my_hostname eq 'lol' and My::NetlogReceiver::init(); + My::Select::timeout(600,\&check_overload); + + My::Select::run(); +} elsif ($options{'lsof'}) { + lsof($options{'lsof'}); +} elsif ($options{'kill'}) { + Donald::Tools::kill_previous_server('clusterd'); +} else { + @ARGV or die USAGE; + my ($cmd,@args)=@ARGV; + if ($cmd eq 'push') { + @args>0 or die USAGE; + cmd_push(@args); + } else { + die USAGE; + } +} diff --git a/clusterd/clusterd.service b/clusterd/clusterd.service new file mode 100644 index 0000000..79f0beb --- /dev/null +++ b/clusterd/clusterd.service @@ -0,0 +1,14 @@ +[Unit] +Description=ClusterDonald +Requires=network.target +After=network.target + +[Service] +ExecStart=/usr/sbin/clusterd --daemon --foreground --kill --syslog +StandardOutput=syslog +Restart=always +RestartSec=10s + +[Install] +WantedBy=multi-user.target + diff --git a/install.sh b/install.sh index 4954b52..798c1f6 100755 --- a/install.sh +++ b/install.sh @@ -123,4 +123,6 @@ install_data misc_systemd_units/getcams.service "$DESTDIR$systemdunitdi install_exec blink/blinkd.py "$DESTDUR$udev_helperdir/blinkd.py" install_data blink/blinkd.service "$DESTDIR$systemdunitdir/blinkd.service" install_data blink/51-blink.rules "$DESTDIR$udev_rulesdir/51-blink.rules" +install_data clusterd/clusterd.service "$DESTDIR$systemdunitdir/clusterd.service" +install_exec clusterd/clusterd "$DESTDIR$usr_sbindir/clusterd" exit