diff --git a/clusterd/clusterd b/clusterd/clusterd index 1f9cfb8..afcb828 100755 --- a/clusterd/clusterd +++ b/clusterd/clusterd @@ -16,82 +16,56 @@ use Storable; use Digest::MD5; sub hostname { - our $hostname; - unless (defined $hostname) { - $hostname=lc `/bin/hostname`; - chomp($hostname); - $hostname =~ s/\.molgen\.mpg\.de$//; - } - return $hostname; + our $hostname; + unless (defined $hostname) { + $hostname=lc `/bin/hostname`; + chomp($hostname); + $hostname =~ s/\.molgen\.mpg\.de$//; + } + return $hostname; } sub machine { - our $machine; - chomp($machine=`uname -m`) unless defined $machine; - return $machine; -} - -sub is_alpha { - return machine() eq 'alpha'; + our $machine; + chomp($machine=`uname -m`) unless defined $machine; + return $machine; } sub uptime { - open U,'<','/proc/uptime' or die "/proc/uptime: $!\n"; - my $data; - sysread(U,$data,1024); - close U; - $data=~ /^(\d+\.?\d*)/ or die "bad data from /proc/uptime: $data\n"; - return $1+0; + open U,'<','/proc/uptime' or die "/proc/uptime: $!\n"; + my $data; + sysread(U,$data,1024); + close U; + $data=~ /^(\d+\.?\d*)/ or die "bad data from /proc/uptime: $data\n"; + return $1+0; } sub encode { - return Storable::nfreeze([@_]); + return Storable::nfreeze([@_]); } sub sign { - my ($password,$data)=@_; - return Digest::MD5::md5($password.$data).$data; # 16 byte prefix + my ($password,$data)=@_; + return Digest::MD5::md5($password.$data).$data; # 16 byte prefix } -sub check_sign { # signed-data -> undef or signed-data -> data - my ($password,$data)=@_; - length $data>16 or return undef; - my $rx_digest=substr($data,0,16); - my $signature=Digest::MD5::md5($password.substr($data,16)); - $rx_digest eq $signature or return undef; - return substr($data,16); +sub check_sign { # signed-data -> undef or signed-data -> data + my ($password,$data)=@_; + length $data>16 or return undef; + my $rx_digest=substr($data,0,16); + my $signature=Digest::MD5::md5($password.substr($data,16)); + $rx_digest eq $signature or return undef; + return substr($data,16); } sub decode { - my ($data)=@_; - my $msg; - eval { - $msg=Storable::thaw($data); - }; - $@ and return undef; - return @$msg; -} - -sub kill_previous_server { # kill_previous_server('clusterd') - my ($command)=@_; - my $ret=0; - - # quickfix - dont kill our parent which might be the init.d/script with the same name.... - - my $ppid; - for (`ps -o ppid,comm -p $$`) { - $ppid=$1 if /(\d+)/; - } - - for (`ps -Ao pid,comm`) { - my @F=split; - if ($F[1] eq $command && $F[0] ne $$ && $F[0] ne $ppid) { - kill 1=>$F[0]; - warn "stopped $command pid $F[0]\n"; - $ret=1; - } - } - return $ret; + my ($data)=@_; + my $msg; + eval { + $msg=Storable::thaw($data); + }; + $@ and return undef; + return @$msg; } #------------------------------------- @@ -114,15 +88,15 @@ use Class::Struct (map {$_=>'$'} qw(name dev ino type perm nlink uid gid rdev si use Fcntl ':mode'; sub fn_escape { - my ($fn)=@_; - $fn=~s/([[:^graph:]\\])/'\x'.sprintf('%02x',ord($1))/ge; - return $fn; + my ($fn)=@_; + $fn=~s/([[:^graph:]\\])/'\x'.sprintf('%02x',ord($1))/ge; + return $fn; } sub fn_unescape { - my ($fn)=@_; - $fn=~s/\\x([0-9a-f]{0,2})/chr(hex($1))/gie; - return $fn; + my ($fn)=@_; + $fn=~s/\\x([0-9a-f]{0,2})/chr(hex($1))/gie; + return $fn; } # @@ -144,83 +118,83 @@ sub fn_unescape { # S path perm uid gid - - hardlink sub name_escaped { - my ($self)=shift; - return fn_escape($self->name); + my ($self)=shift; + return fn_escape($self->name); } sub fileid { - my ($self)=shift; - return($self->dev.'.'.$self->ino); + my ($self)=shift; + return($self->dev.'.'.$self->ino); } sub export_index { - my ($self)=@_; - my $type=$self->type; - return ( - $type, - fn_escape($self->name), - $self->perm, - $self->uid, - $self->gid, - ( - $type eq 'F' ? $self->size : - $type eq 'L' ? fn_escape($self->target) : - $type eq 'B' || $type eq 'C' ? $self->rdev : - '-' - ), - $type eq 'F'||$type eq 'D' ? $self->mtime : '-', - ); + my ($self)=@_; + my $type=$self->type; + return ( + $type, + fn_escape($self->name), + $self->perm, + $self->uid, + $self->gid, + ( + $type eq 'F' ? $self->size : + $type eq 'L' ? fn_escape($self->target) : + $type eq 'B' || $type eq 'C' ? $self->rdev : + '-' + ), + $type eq 'F'||$type eq 'D' ? $self->mtime : '-', + ); } sub import_index { - my ($class,@F)=@_; - my $type=$F[0]; - return bless [ - fn_unescape($F[1]), # name - 0,0, # dev,ino - $type, # type - $F[2], # perm - 0, # nlink - $F[3],$F[4], # uid,gid - ($type eq 'B' || $type eq 'C' ? $F[5] : 0), # rdev - ($type eq 'F' ? $F[5] : 0 ), # size - ($type eq 'F'||$type eq 'D' ? $F[6] : 0 ), # mtime - ($type eq 'L' ? fn_unescape($F[5]) : ''), # target - ], - $class; + my ($class,@F)=@_; + my $type=$F[0]; + return bless [ + fn_unescape($F[1]), # name + 0,0, # dev,ino + $type, # type + $F[2], # perm + 0, # nlink + $F[3],$F[4], # uid,gid + ($type eq 'B' || $type eq 'C' ? $F[5] : 0), # rdev + ($type eq 'F' ? $F[5] : 0 ), # size + ($type eq 'F'||$type eq 'D' ? $F[6] : 0 ), # mtime + ($type eq 'L' ? fn_unescape($F[5]) : ''), # target + ], + $class; } sub lstat { - my ($class,$filename)=@_; - my $target; - my @f; - - unless (@f=lstat $filename) { - $!==2 and return undef; # ENOENT - die "$filename: $!\n"; - } - if (-l _) { - defined ($target=readlink($filename)) or die "$filename: $!\n"; - } - my $type = - S_ISREG($f[2]) ? 'F' : - S_ISDIR($f[2]) ? 'D' : - S_ISLNK($f[2]) ? 'L' : - S_ISBLK($f[2]) ? 'B' : - S_ISCHR($f[2]) ? 'C' : - S_ISFIFO($f[2]) ? 'P' : - S_ISSOCK($f[2]) ? 'S' : - die ("$filename: unsupported file type\n"); - return bless [ - $filename, # name - @f[0,1], # dev,ino - $type, # type - S_IMODE($f[2]), # perm - @f[3..7], # nlink,uid,gid,rdev,size - $f[9], # mtime - ($type eq 'L' ? $target : '') # target - ], - $class; + my ($class,$filename)=@_; + my $target; + my @f; + + unless (@f=lstat $filename) { + $!==2 and return undef; # ENOENT + die "$filename: $!\n"; + } + if (-l _) { + defined ($target=readlink($filename)) or die "$filename: $!\n"; + } + my $type = + S_ISREG($f[2]) ? 'F' : + S_ISDIR($f[2]) ? 'D' : + S_ISLNK($f[2]) ? 'L' : + S_ISBLK($f[2]) ? 'B' : + S_ISCHR($f[2]) ? 'C' : + S_ISFIFO($f[2]) ? 'P' : + S_ISSOCK($f[2]) ? 'S' : + die ("$filename: unsupported file type\n"); + return bless [ + $filename, # name + @f[0,1], # dev,ino + $type, # type + S_IMODE($f[2]), # perm + @f[3..7], # nlink,uid,gid,rdev,size + $f[9], # mtime + ($type eq 'L' ? $target : '') # target + ], + $class; } #------------------------------------- @@ -228,124 +202,115 @@ package My::Select; our $time=Donald::Tools::uptime(); -sub My::Select::time -{ - return $time; +sub My::Select::time { + return $time; } -our @TIMER=(); # ( [duetime,cb] , ... ) sorted by time +our @TIMER=(); # ( [duetime,cb] , ... ) sorted by time our $active_timer_cb; -sub timeout # cb=Select::timeout(seconds,cb) -{ - my ($delta,$cb)=@_; - my $duetime=$time+$delta; - @TIMER=sort {$a->[0] <=> $b->[0]} ( [$duetime,$cb] , @TIMER ); - return $cb; +sub timeout { # cb=Select::timeout(seconds,cb) + my ($delta,$cb)=@_; + my $duetime=$time+$delta; + @TIMER=sort {$a->[0] <=> $b->[0]} ( [$duetime,$cb] , @TIMER ); + return $cb; } -sub timeout_cancel # cb=Select::timeout_cancel(cb) -{ - my ($cb)=@_; - @TIMER=grep {$_->[1] != $cb} @TIMER; - return $cb; +sub timeout_cancel { # cb=Select::timeout_cancel(cb) + my ($cb)=@_; + @TIMER=grep {$_->[1] != $cb} @TIMER; + return $cb; } -sub timeout_requeue {timeout(shift,$active_timer_cb); } # Select::timeout_requeue(seconds) +sub timeout_requeue {timeout(shift,$active_timer_cb); } # Select::timeout_requeue(seconds) #------------------------------------ -our @READER; # ( [Handle,cb] , ... ) +our @READER; # ( [Handle,cb] , ... ) our @WRITER; our @EXCEPT; -our $active_io; # [Handle,cb] +our $active_io; # [Handle,cb] -sub reader # cb = Select::reader(Handle,cb) -{ - my ($handle,$cb)=@_; - push @READER,[$handle,$cb]; - return $cb; +sub reader { # cb = Select::reader(Handle,cb) + my ($handle,$cb)=@_; + push @READER,[$handle,$cb]; + return $cb; } -sub writer # cb = Select::writer(Handle,cb) -{ - my ($handle,$cb)=@_; - push @WRITER,[$handle,$cb]; - return $cb; +sub writer { # cb = Select::writer(Handle,cb) + my ($handle,$cb)=@_; + push @WRITER,[$handle,$cb]; + return $cb; } -sub except # cb = Select::except(Handle,cb) -{ - my ($handle,$cb)=@_; - push @EXCEPT,[$handle,$cb]; - return $cb; +sub except { # cb = Select::except(Handle,cb) + my ($handle,$cb)=@_; + push @EXCEPT,[$handle,$cb]; + return $cb; } sub reader_requeue {push @READER,$active_io} sub writer_requeue {push @WRITER,$active_io} sub except_requeue {push @EXCEPT,$active_io} -sub cancel # $cb = Select::cancel(cb) -{ - my ($cb)=@_; - defined $cb or $cb=$active_io->[1]; - @READER=grep {$_->[1] != $cb} @READER; - @WRITER=grep {$_->[1] != $cb} @WRITER; - @EXCEPT=grep {$_->[1] != $cb} @EXCEPT; -} - -sub cancel_handle -{ - my ($handle)=@_; - @READER=grep {$_->[0] != $handle} @READER; - @WRITER=grep {$_->[0] != $handle} @WRITER; - @EXCEPT=grep {$_->[0] != $handle} @EXCEPT; -} - -sub run -{ - while (1) { - $time=Donald::Tools::uptime(); - while (@TIMER && $TIMER[0]->[0]<=$time) { - $active_timer_cb=(shift @TIMER)->[1]; - $active_timer_cb->(); - } - $active_timer_cb=undef; - - my ($rvec,$wvec,$evec)=('','',''); - - for (@READER) { vec($rvec,$_->[0]->fileno,1)=1 } ; - for (@WRITER) { vec($wvec,$_->[0]->fileno,1)=1 } ; - for (@EXCEPT) { vec($evec,$_->[0]->fileno,1)=1 } ; - - my $ready=select($rvec,$wvec,$evec,1); - if ($ready>0) { - for (my $i=0;$i<@READER;$i++) { - if (vec($rvec,$READER[$i]->[0]->fileno,1)) { - $active_io=splice @READER,$i,1; - $active_io->[1]->(); - $active_io=undef; - last; - } - } - for (my $i=0;$i<@WRITER;$i++) { - if (vec($wvec,$WRITER[$i]->[0]->fileno,1)) { - $active_io=splice @WRITER,$i,1; - $active_io->[1]->(); - $active_io=undef; - last; - } - } - for (my $i=0;$i<@EXCEPT;$i++) { - if (vec($evec,$EXCEPT[$i]->[0]->fileno,1)) { - $active_io=splice @EXCEPT,$i,1; - $active_io->[1]->(); - $active_io=undef; - last; - } - } - } - } +sub cancel { # $cb = Select::cancel(cb) + my ($cb)=@_; + defined $cb or $cb=$active_io->[1]; + @READER=grep {$_->[1] != $cb} @READER; + @WRITER=grep {$_->[1] != $cb} @WRITER; + @EXCEPT=grep {$_->[1] != $cb} @EXCEPT; +} + +sub cancel_handle { + my ($handle)=@_; + @READER=grep {$_->[0] != $handle} @READER; + @WRITER=grep {$_->[0] != $handle} @WRITER; + @EXCEPT=grep {$_->[0] != $handle} @EXCEPT; +} + +sub run { + while (1) { + $time=Donald::Tools::uptime(); + while (@TIMER && $TIMER[0]->[0]<=$time) { + $active_timer_cb=(shift @TIMER)->[1]; + $active_timer_cb->(); + } + $active_timer_cb=undef; + + my ($rvec,$wvec,$evec)=('','',''); + + for (@READER) { vec($rvec,$_->[0]->fileno,1)=1 } ; + for (@WRITER) { vec($wvec,$_->[0]->fileno,1)=1 } ; + for (@EXCEPT) { vec($evec,$_->[0]->fileno,1)=1 } ; + + my $ready=select($rvec,$wvec,$evec,1); + if ($ready>0) { + for (my $i=0;$i<@READER;$i++) { + if (vec($rvec,$READER[$i]->[0]->fileno,1)) { + $active_io=splice @READER,$i,1; + $active_io->[1]->(); + $active_io=undef; + last; + } + } + for (my $i=0;$i<@WRITER;$i++) { + if (vec($wvec,$WRITER[$i]->[0]->fileno,1)) { + $active_io=splice @WRITER,$i,1; + $active_io->[1]->(); + $active_io=undef; + last; + } + } + for (my $i=0;$i<@EXCEPT;$i++) { + if (vec($evec,$EXCEPT[$i]->[0]->fileno,1)) { + $active_io=splice @EXCEPT,$i,1; + $active_io->[1]->(); + $active_io=undef; + last; + } + } + } + } } #-------------------------------------- @@ -354,52 +319,47 @@ package My::Select::INET ; use Carp; use IO::Socket::INET; -our $UDP_MAX=1472; # for broadcast on alphas +our $UDP_MAX = 1472; our $SOL_SOCKET=1; our $SO_ERROR=4; -sub get_socket_error -{ - my ($s)=@_; - return unpack('i',getsockopt($s,$SOL_SOCKET,$SO_ERROR)); +sub get_socket_error { + my ($s)=@_; + return unpack('i',getsockopt($s,$SOL_SOCKET,$SO_ERROR)); } -sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) -{ - my ($class,@args)=@_; - our $socket=new IO::Socket::INET (@args) or return undef; - return bless \$socket,$class; +sub new { # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) + my ($class,@args)=@_; + our $socket=new IO::Socket::INET (@args) or return undef; + return bless \$socket,$class; } -sub send_data -{ - my ($self,$ip,$port,$data)=@_; - my $ip_address=inet_aton($ip); - unless (defined $ip_address) {carp("can't resolve $ip\n");return undef} - unless (length($data)<=$UDP_MAX) {carp("message to long\n");return undef} - $$self->send($data,0,pack_sockaddr_in($port,$ip_address)) or carp "$!\n"; +sub send_data { + my ($self,$ip,$port,$data)=@_; + my $ip_address=inet_aton($ip); + unless (defined $ip_address) {carp("can't resolve $ip\n");return undef} + unless (length($data)<=$UDP_MAX) {carp("message to long\n");return undef} + $$self->send($data,0,pack_sockaddr_in($port,$ip_address)) or carp "$!\n"; } -sub reader -{ - my ($self,$sub)=@_; - My::Select::reader($$self,$sub); +sub reader { + my ($self,$sub)=@_; + My::Select::reader($$self,$sub); } -sub receive_data -{ - my ($self,$cb,@args)=@_; +sub receive_data { + my ($self,$cb,@args)=@_; - my $receive_data_cb=sub { - my $data; - my $peer = $$self->recv($data,$UDP_MAX); - my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); - my $udp_peer_addr=inet_ntoa($peer_iaddr); - My::Select::reader_requeue(); - $cb->($data,$udp_peer_addr,$udp_peer_port); - }; - My::Select::reader($$self,$receive_data_cb); + my $receive_data_cb=sub { + my $data; + my $peer = $$self->recv($data,$UDP_MAX); + my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); + my $udp_peer_addr=inet_ntoa($peer_iaddr); + My::Select::reader_requeue(); + $cb->($data,$udp_peer_addr,$udp_peer_port); + }; + My::Select::reader($$self,$receive_data_cb); } # send_tcp($socket,$data,$timeout,$cb); @@ -410,35 +370,35 @@ sub receive_data # all arguments required # sub send_tcp { - my ($s,$data,$timeout,$cb)=@_; - my $len=$s->send($data,0); - defined $len or return $cb->(); - if ($len==length($data)) { - $!=0; - $cb->(); - return; - } - my $pos=$len; - my $cb_tmo=sub { - My::Select::cancel_handle($s); - $!=110; - $cb->(); - }; - my $cb_write=sub { - My::Select::timeout_cancel($cb_tmo); - my $len=send($s,substr($data,$pos),0); - defined $len or return $cb->(); - if ($len==length($data)-$pos) { - $!=0; - $cb->(); - return; - } - $pos+=$len; - My::Select::timeout($timeout,$cb_tmo); - My::Select::writer_requeue(); - }; - My::Select::timeout($timeout,$cb_tmo); - My::Select::writer($s,$cb_write); + my ($s,$data,$timeout,$cb)=@_; + my $len=$s->send($data,0); + defined $len or return $cb->(); + if ($len==length($data)) { + $!=0; + $cb->(); + return; + } + my $pos=$len; + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(); + }; + my $cb_write=sub { + My::Select::timeout_cancel($cb_tmo); + my $len=send($s,substr($data,$pos),0); + defined $len or return $cb->(); + if ($len==length($data)-$pos) { + $!=0; + $cb->(); + return; + } + $pos+=$len; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer_requeue(); + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer($s,$cb_write); } # $socket = connect_tcp ($ip,$port,$timeout,$cb) @@ -449,23 +409,23 @@ sub send_tcp { # all arguments required # sub connect_tcp { - my ($ip,$port,$timeout,$cb)=@_; - - my $s=new IO::Socket::INET (PeerAddr=>$ip,PeerPort=>$port,Blocking=>0); - defined $s or return $cb->(); - my $cb_tmo=sub { - My::Select::cancel_handle($s); - $!=110; - $cb->(); - }; - my $cb_write=sub { - My::Select::timeout_cancel($cb_tmo); - $!=get_socket_error($s); - $cb->(); - }; - My::Select::timeout($timeout,$cb_tmo); - My::Select::writer($s,$cb_write); - return $s; + my ($ip,$port,$timeout,$cb)=@_; + + my $s=new IO::Socket::INET (PeerAddr=>$ip,PeerPort=>$port,Blocking=>0); + defined $s or return $cb->(); + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(); + }; + my $cb_write=sub { + My::Select::timeout_cancel($cb_tmo); + $!=get_socket_error($s); + $cb->(); + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer($s,$cb_write); + return $s; } # read_with_timeout($socket,$callback,$timeout) @@ -473,25 +433,25 @@ sub connect_tcp { # asynchronously read from tcp socket. # sub read_with_timeout { - my ($s,$cb,$timeout)=@_; - my $cb_tmo=sub { - My::Select::cancel_handle($s); - $!=110; - $cb->(undef); - }; - my $cb_read=sub { - My::Select::timeout_cancel($cb_tmo); - my $buf=''; - my $l=sysread($s,$buf,102400,0); - if (!defined $l) { - $cb->(undef); - } else { - $!=0; - $cb->($buf); - } - }; - My::Select::timeout($timeout,$cb_tmo); - My::Select::reader($s,$cb_read) + my ($s,$cb,$timeout)=@_; + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(undef); + }; + my $cb_read=sub { + My::Select::timeout_cancel($cb_tmo); + my $buf=''; + my $l=sysread($s,$buf,102400,0); + if (!defined $l) { + $cb->(undef); + } else { + $!=0; + $cb->($buf); + } + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::reader($s,$cb_read) } #-------------------------------------- @@ -501,7 +461,7 @@ package My::Cluster::Updown; # monitor nodes # -our %H; # ( name=> [state , last_seen , expect_seq , @data ] , ... ) +our %H; # ( name=> [state , last_seen , expect_seq , @data ] , ... ) our $MONITOR_STARTING=1; our %DUPLICATES; @@ -516,139 +476,138 @@ our %DUPLICATES; use Storable; sub save_state { - store \%H,'/root/clusterd.monitor.state'; + store \%H,'/root/clusterd.monitor.state'; } sub restore_state { - -e '/root/clusterd.monitor.state' and %H=%{retrieve '/root/clusterd.monitor.state'}; - for (values %H) { - $_->[1]=0; - } + -e '/root/clusterd.monitor.state' and %H=%{retrieve '/root/clusterd.monitor.state'}; + for (values %H) { + $_->[1]=0; + } } sub status { - my ($f)=@_; - my (@up,@down); - for (sort keys %H) { push @up, $_ if $H{$_}->[0] eq 'UP' } - for (sort keys %H) { push @down,$_ if $H{$_}->[0] eq 'DOWN' } - - $f->printf("UP (%d): %s\n\n",scalar(@up),join(' ',@up)); - $f->printf("DOWN (%d): %s\n\n",scalar(@down),join(' ',@down)); - - my (@k,$max); - - @k=sort({$H{$b}->[3] <=> $H{$a}->[3]} grep({$H{$_}->[0] eq 'UP'} (keys %H))); - $max=0; - $f->print("TOP 10: "); - for (@k) { - $f->print(sprintf "%s (%4.2f) ",$_,$H{$_}->[3]); - last if $max++>=10; - } - $f->print("\n\n"); - - @k=sort({($H{$b}->[5]||0) <=> ($H{$a}->[5]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H))); - $max=0; - $f->print("TOP 10 CPU load : "); - for (@k) { - $f->print(sprintf "%s (%3.1f%%) ",$_,($H{$_}->[5]||0)*100); - last if $max++>=10; - } - $f->print("\n\n"); - - @k=sort({($H{$b}->[6]||0) <=> ($H{$a}->[6]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H))); - $max=0; - my $total_bogomips=0; - $f->print("TOP 10 free capacity : "); - for (@k) { - my $bogo=$H{$_}->[6]||0; - $f->print(sprintf "%s (%3.1f) ",$_,$bogo) if $max++<10; - $total_bogomips+=$bogo - } - $f->printf("\ntotal available bogomips: %d\n\n",$total_bogomips); + my ($f)=@_; + my (@up,@down); + for (sort keys %H) { push @up, $_ if $H{$_}->[0] eq 'UP' } + for (sort keys %H) { push @down,$_ if $H{$_}->[0] eq 'DOWN' } + + $f->printf("UP (%d): %s\n\n",scalar(@up),join(' ',@up)); + $f->printf("DOWN (%d): %s\n\n",scalar(@down),join(' ',@down)); + + my (@k,$max); + + @k=sort({$H{$b}->[3] <=> $H{$a}->[3]} grep({$H{$_}->[0] eq 'UP'} (keys %H))); + $max=0; + $f->print("TOP 10: "); + for (@k) { + $f->print(sprintf "%s (%4.2f) ",$_,$H{$_}->[3]); + last if $max++>=10; + } + $f->print("\n\n"); + + @k=sort({($H{$b}->[5]||0) <=> ($H{$a}->[5]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H))); + $max=0; + $f->print("TOP 10 CPU load : "); + for (@k) { + $f->print(sprintf "%s (%3.1f%%) ",$_,($H{$_}->[5]||0)*100); + last if $max++>=10; + } + $f->print("\n\n"); + + @k=sort({($H{$b}->[6]||0) <=> ($H{$a}->[6]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H))); + $max=0; + my $total_bogomips=0; + $f->print("TOP 10 free capacity : "); + for (@k) { + my $bogo=$H{$_}->[6]||0; + $f->print(sprintf "%s (%3.1f) ",$_,$bogo) if $max++<10; + $total_bogomips+=$bogo + } + $f->printf("\ntotal available bogomips: %d\n\n",$total_bogomips); } sub msg_text { - my ($text)=@_; - warn($text."\n"); - my $time=scalar(localtime); - main::mgmt_print_all($time.': '.$text."\n"); + my ($text)=@_; + warn($text."\n"); + my $time=scalar(localtime); + main::mgmt_print_all($time.': '.$text."\n"); } sub msg_state { - my ($state_old,$state_new,$hostname,$extra)=@_; - msg_text(sprintf "%-4s -> %-4s %-20s %s",$state_old,$state_new,$hostname,$extra); + my ($state_old,$state_new,$hostname,$extra)=@_; + msg_text(sprintf "%-4s -> %-4s %-20s %s",$state_old,$state_new,$hostname,$extra); } sub init { - restore_state; - msg_text('node monitor: started. (recovery mode)'); - My::Select::timeout(630,sub{$MONITOR_STARTING=0;msg_text('node monitor: recovery finished');}); - My::Select::timeout(630,\&timeout_hosts); + restore_state; + msg_text('node monitor: started. (recovery mode)'); + My::Select::timeout(630,sub{$MONITOR_STARTING=0;msg_text('node monitor: recovery finished');}); + My::Select::timeout(630,\&timeout_hosts); } sub timeout_hosts { - My::Select::timeout_requeue(60); - my $timeout=My::Select::time()-1230; # 2x10 minutes + 30 seconds + My::Select::timeout_requeue(60); + my $timeout=My::Select::time()-1230; # 2x10 minutes + 30 seconds - for (keys %H) { - my $h=$H{$_}; - if ($h->[0] eq 'UP' && $h->[1]<=$timeout) { - msg_state('UP','DOWN',$_,"timeout!"); - $h->[0]='DOWN'; - } - } - save_state(); + for (keys %H) { + my $h=$H{$_}; + if ($h->[0] eq 'UP' && $h->[1]<=$timeout) { + msg_state('UP','DOWN',$_,"timeout!"); + $h->[0]='DOWN'; + } + } + save_state(); } sub rx_hostannounce { - # ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev) - - my ($host,$seq,@more)=@_; - my ($load_average,$version,$pload,$pcapacity,$unixrev)=@more; - $unixrev ||= '?'; - unless (exists $H{$host}) { - if ($seq==0) { - msg_state('NEW','UP',$host,"discovered new node $version - $unixrev"); - } else { - $MONITOR_STARTING or msg_state('NEW','UP',$host,"discovered running node $version - $unixrev"); - } - $H{$host}=['UP',My::Select::time,$seq+1,@more]; - } else { - my $h=$H{$host}; - if ($h->[0] eq 'UP') { - if ($seq == $h->[2]) { - ; - } else { - if ($seq==0) { - # msg_state('UP','UP',$host,"node rebootet $version - $unixrev"); - - } elsif ($seq>$h->[2]) { - $MONITOR_STARTING or msg_state('UP','UP',$host,$seq-$h->[2]." packet(s) lost!"); - } elsif ($seq+1==$h->[2]) { - warn "DUPLICATE from $host\n"; - $DUPLICATES{$host}=($DUPLICATES{$host}||0)+1; - } else { - msg_state('UP','UP',$host,"node rebooted and ".$seq." packet(s) lost!"); - } - } - } else { - if ($seq==0) { - msg_state('DOWN','UP',$host,"node rebooted $version - $unixrev"); - } elsif ($seq==$h->[2]) { - msg_state('DOWN','UP',$host,"sequence error. Node to slow? (seq=$seq,exp=".$h->[2].")"); - } else { - msg_state('DOWN','UP',$host,$seq-$h->[2]." packet(s) lost"); - } - } - @$h=('UP',My::Select::time,$seq+1,@more); - } + # ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev) + + my ($host,$seq,@more)=@_; + my ($load_average,$version,$pload,$pcapacity,$unixrev)=@more; + $unixrev ||= '?'; + unless (exists $H{$host}) { + if ($seq==0) { + msg_state('NEW','UP',$host,"discovered new node $version - $unixrev"); + } else { + $MONITOR_STARTING or msg_state('NEW','UP',$host,"discovered running node $version - $unixrev"); + } + $H{$host}=['UP',My::Select::time,$seq+1,@more]; + } else { + my $h=$H{$host}; + if ($h->[0] eq 'UP') { + if ($seq == $h->[2]) { + ; + } else { + if ($seq==0) { + # msg_state('UP','UP',$host,"node rebootet $version - $unixrev"); + } elsif ($seq>$h->[2]) { + $MONITOR_STARTING or msg_state('UP','UP',$host,$seq-$h->[2]." packet(s) lost!"); + } elsif ($seq+1==$h->[2]) { + warn "DUPLICATE from $host\n"; + $DUPLICATES{$host}=($DUPLICATES{$host}||0)+1; + } else { + msg_state('UP','UP',$host,"node rebooted and ".$seq." packet(s) lost!"); + } + } + } else { + if ($seq==0) { + msg_state('DOWN','UP',$host,"node rebooted $version - $unixrev"); + } elsif ($seq==$h->[2]) { + msg_state('DOWN','UP',$host,"sequence error. Node to slow? (seq=$seq,exp=".$h->[2].")"); + } else { + msg_state('DOWN','UP',$host,$seq-$h->[2]." packet(s) lost"); + } + } + @$h=('UP',My::Select::time,$seq+1,@more); + } } sub delete_host { - my ($host)=@_; - my $h=delete $H{$host} or return; - msg_text("host $host removed from monitor"); + my ($host)=@_; + my $h=delete $H{$host} or return; + msg_text("host $host removed from monitor"); } #----------------------------------------------------------------------- @@ -662,68 +621,68 @@ our $TCP_MAX=1024; our $DAY_LAST_MSG; sub day { - my @f=localtime; - return sprintf "%04d%02d%02d",$f[5]+1900,$f[4]+1,$f[3]; + my @f=localtime; + return sprintf "%04d%02d%02d",$f[5]+1900,$f[4]+1,$f[3]; } sub bigben { - my $day=day(); - $day le $DAY_LAST_MSG and return; - syslog('warning', "NETLOG ==================================================== morning has broken ====\n"); - $DAY_LAST_MSG=$day; + my $day=day(); + $day le $DAY_LAST_MSG and return; + syslog('warning', "NETLOG ==================================================== morning has broken ====\n"); + $DAY_LAST_MSG=$day; } sub bigben_timer { - bigben(); - My::Select::timeout_requeue(30); + bigben(); + My::Select::timeout_requeue(30); } sub bigben_init { - $DAY_LAST_MSG=day(); - My::Select::timeout(30,\&bigben_timer); + $DAY_LAST_MSG=day(); + My::Select::timeout(30,\&bigben_timer); } sub receive { - my ($socket,$peernode,$bufref)=@_; - my $data; + my ($socket,$peernode,$bufref)=@_; + my $data; - bigben(); + bigben(); - defined $socket->recv($data,$TCP_MAX) or return; - # length $data or warn "$peernode: disconnect\n"; - length $data or return; + defined $socket->recv($data,$TCP_MAX) or return; + # length $data or warn "$peernode: disconnect\n"; + length $data or return; - $$bufref.=$data; + $$bufref.=$data; - while (1) { - last if length($$bufref)<2; - my $l=unpack('n',$$bufref); - # warn "wait for $l+2 bytes got ".length($$bufref)."\n"; - last if length($$bufref)<2+$l; - my $msg=substr($$bufref,2,$l); - $$bufref=substr($$bufref,2+$l); - syslog('warning', "NETLOG $msg\n") unless $msg=~/NETLOG/; - } - My::Select::reader_requeue(); + while (1) { + last if length($$bufref)<2; + my $l=unpack('n',$$bufref); + # warn "wait for $l+2 bytes got ".length($$bufref)."\n"; + last if length($$bufref)<2+$l; + my $msg=substr($$bufref,2,$l); + $$bufref=substr($$bufref,2+$l); + syslog('warning', "NETLOG $msg\n") unless $msg=~/NETLOG/; + } + My::Select::reader_requeue(); } sub connect_request { - My::Select::reader_requeue(); + My::Select::reader_requeue(); - my $socket=$listen_socket->accept(); - $socket->blocking(0); + my $socket=$listen_socket->accept(); + $socket->blocking(0); - my $peernode=$socket->peerhost; - my $buffer=''; - My::Select::reader($socket,sub{receive($socket,$peernode,\$buffer)}); - # warn "$peernode: connect\n"; + my $peernode=$socket->peerhost; + my $buffer=''; + My::Select::reader($socket,sub{receive($socket,$peernode,\$buffer)}); + # warn "$peernode: connect\n"; } sub init { - $listen_socket=new IO::Socket::INET(Proto=>'tcp',LocalPort=>1028,Listen=>1,ReuseAddr=>1) or die "$!\n"; - My::Select::reader($listen_socket,\&connect_request); - bigben_init(); + $listen_socket=new IO::Socket::INET(Proto=>'tcp',LocalPort=>1028,Listen=>1,ReuseAddr=>1) or die "$!\n"; + My::Select::reader($listen_socket,\&connect_request); + bigben_init(); } #------------------------------------------------------------------------ @@ -734,33 +693,33 @@ use Sys::Syslog; use IO::Socket::INET; use Data::Dumper; -our $UDP_MAX=1472; # for broadcast on alphas +our $UDP_MAX=1472; our $UDP_PORT=234; -our $BC_RATE=8; # packets per second broadcast -our $TCP_TIMEOUT=30; # default timeout for tcp processing +our $BC_RATE = 8; # packets per second broadcast +our $TCP_TIMEOUT=30; # default timeout for tcp processing -our (%options); # RUN OPTIONS +our (%options); # RUN OPTIONS -our $donald_s; # My::Select::INET udp socket +our $donald_s; # My::Select::INET udp socket our $my_hostname; -our $my_ip; # '141.14.12.12' +our $my_ip; # '141.14.12.12' $my_hostname=lc `/bin/hostname`; chomp($my_hostname); $my_hostname =~ s/\.molgen\.mpg\.de$//; while (1) { - my $addr=inet_aton($my_hostname); - if(defined $addr) { - $my_ip=inet_ntoa(inet_aton($my_hostname)); - } - last if defined $my_ip; - my $once; - unless ($once) { - warn "no IP (yet)= - waiting\n"; - $once++; - } - sleep 30; + my $addr=inet_aton($my_hostname); + if(defined $addr) { + $my_ip=inet_ntoa(inet_aton($my_hostname)); + } + last if defined $my_ip; + my $once; + unless ($once) { + warn "no IP (yet)= - waiting\n"; + $once++; + } + sleep 30; } our $my_unixrev; @@ -771,200 +730,223 @@ our $CLUSTER_PW; our $CLUSTER_PW_FILE='/etc/clusterd.password'; our $CLUSTER_PW_TIMESTAMP=0; -$ENV{'PATH'} = '/usr/local/bin:/sbin:/usr/sbin:/bin'.($ENV{PATH}?':'.$ENV{PATH}:''); # for ps , tar (gnu!) +$ENV{'PATH'} = '/usr/local/bin:/sbin:/usr/sbin:/bin'.($ENV{PATH}?':'.$ENV{PATH}:''); # for ps , tar (gnu!) #---------------------------------------------------------- UDP -our ($udp_peer_addr,$udp_peer_port); # ('141.14.12.12',1234) - -our %UDP_HANDLER= -( - 'filedata' => \&udp_rx_filedata, - 'filedata.2' => \&udp_rx_filedata, - 'amdtardata' => \&udp_rx_amdtardata, - 'loadavg.2' => \&udp_rx_loadavg2, - 'restart' => \&udp_rx_restart, - 'flush-gidcache' => \&udp_rx_flush_gidcache, - 'make-automaps' => \&udp_rx_make_automaps, - 'reexport' => \&udp_rx_reexport, - 'log' => \&udp_rx_log, - 'exec' => \&udp_rx_exec, - 'push' => \&udp_rx_push, +our ($udp_peer_addr,$udp_peer_port); # ('141.14.12.12',1234) + +our %UDP_HANDLER = ( + 'filedata' => \&udp_rx_filedata, + 'filedata.2' => \&udp_rx_filedata, + 'amdtardata' => \&udp_rx_amdtardata, + 'loadavg.2' => \&udp_rx_loadavg2, + 'restart' => \&udp_rx_restart, + 'flush-gidcache' => \&udp_rx_flush_gidcache, + 'make-automaps' => \&udp_rx_make_automaps, + 'reexport' => \&udp_rx_reexport, + 'log' => \&udp_rx_log, + 'exec' => \&udp_rx_exec, + 'exec.2' => \&udp_rx_exec2, + 'push' => \&udp_rx_push, + 'push.2' => \&udp_rx_push2, ); sub udp_message { - my ($data,$x_udp_peer_addr,$x_udp_peer_port,$donald_s)=@_; + my ($data,$x_udp_peer_addr,$x_udp_peer_port,$donald_s)=@_; - ($udp_peer_addr,$udp_peer_port)=($x_udp_peer_addr,$x_udp_peer_port); + ($udp_peer_addr,$udp_peer_port)=($x_udp_peer_addr,$x_udp_peer_port); - defined $CLUSTER_PW or return; + defined $CLUSTER_PW or return; - my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return; - $UDP_HANDLER{$handler_name}->(@args) if exists $UDP_HANDLER{$handler_name}; + my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return; + $UDP_HANDLER{$handler_name}->(@args) if exists $UDP_HANDLER{$handler_name}; } -sub udp_send_message { # udp_send_message( dst, @args) # dst='141.14.31.255' 'zork' '141.14.16.1' etc. data is anything - my ($ip,@args)=@_; - defined $CLUSTER_PW or return; - $donald_s->send_data($ip,$UDP_PORT,sign($CLUSTER_PW,encode(@args))); +sub udp_send_message { # udp_send_message( dst, @args) # dst='141.14.31.255' 'zork' '141.14.16.1' etc. data is anything + my ($ip,@args)=@_; + defined $CLUSTER_PW or return; + $donald_s->send_data($ip,$UDP_PORT,sign($CLUSTER_PW,encode(@args))); } #---------------------------------------------------------- sub push_amd_tar { - my ($donald_s)=@_; - my $filename='/tmp/amd.tar'; - - my $pid=fork; - defined $pid or return warn "$!\n"; - unless($pid) { - chdir '/etc/amd' or die "/etc/amd: $!\n"; - exec 'tar','cf',$filename,'.'; - die "$!\n"; - } - wait; - $? and return; - - my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; - my $digest=Digest::MD5->new->addfile($fh)->digest; - warn "tar digest is ",Digest::MD5::md5_hex($digest),"\n"; - - $pid=fork; - defined $pid or return warn "$!\n"; - unless($pid) { - exec 'gzip','-f',$filename; - die "$!\n"; - } - wait; - $? and return; - - $filename='/tmp/amd.tar.gz'; - - my $st=Donald::FileInfo->lstat($filename); - defined $st or return warn "$filename: $!\n"; - $st->type eq 'F' or return warn "$filename: not a plain file\n"; - - $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; - - my $i=0; - for (my $pos=0;$pos<$st->size;$pos+=1024) { - my $data; - defined $fh->sysread($data,1024) or return warn "$filename: $!\n"; - # warn "send bytes $pos to ",$pos+length($data),"\n"; - ++$i % $BC_RATE or sleep 1; - udp_broadcast_message($donald_s,'amdtardata',$st,$pos,$data,$digest); - } + my ($donald_s)=@_; + my $filename='/tmp/amd.tar'; + + my $pid=fork; + defined $pid or return warn "$!\n"; + unless($pid) { + chdir '/etc/amd' or die "/etc/amd: $!\n"; + exec 'tar','cf',$filename,'.'; + die "$!\n"; + } + wait; + $? and return; + + my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; + my $digest=Digest::MD5->new->addfile($fh)->digest; + warn "tar digest is ",Digest::MD5::md5_hex($digest),"\n"; + + $pid=fork; + defined $pid or return warn "$!\n"; + unless($pid) { + exec 'gzip','-f',$filename; + die "$!\n"; + } + wait; + $? and return; + + $filename='/tmp/amd.tar.gz'; + + my $st=Donald::FileInfo->lstat($filename); + defined $st or return warn "$filename: $!\n"; + $st->type eq 'F' or return warn "$filename: not a plain file\n"; + + $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; + + my $i=0; + for (my $pos=0;$pos<$st->size;$pos+=1024) { + my $data; + defined $fh->sysread($data,1024) or return warn "$filename: $!\n"; + # warn "send bytes $pos to ",$pos+length($data),"\n"; + ++$i % $BC_RATE or sleep 1; + udp_broadcast_message($donald_s,'amdtardata',$st,$pos,$data,$digest); + } } sub push_file { - my ($donald_s,$filename)=@_; - - $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; - - my $st=Donald::FileInfo->lstat($filename); - defined $st or return warn "$filename: $!\n"; - my $rpc; - if ($st->type eq 'F') { - $rpc='filedata'; - $st->size<=80000 or die "$filename: to big for broadcast (max 80000 bytes)\n"; - if ($st->size==0) { - udp_broadcast_message($donald_s,$rpc,$st,0,''); - return; - } - - my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; - my $i=0; - for (my $pos=0;$pos<$st->size;$pos+=1024) { - my $data; - defined $fh->sysread($data,1024) or return warn "$filename: $!\n"; - # warn "send bytes $pos to ",$pos+length($data),"\n"; - udp_broadcast_message($donald_s,$rpc,$st,$pos,$data); - ++$i % $BC_RATE or sleep 1; - } - } elsif ($st->type eq 'L') { - $rpc='filedata.2'; - udp_broadcast_message($donald_s,$rpc,$st,0,''); - return; - } else { - die "file type not supported\n"; - } - -} - -our %CMD=( - 'mkmotd'=>'/usr/sbin/mkmotd.pl', + my ($donald_s,$filename)=@_; + + $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; + + my $st=Donald::FileInfo->lstat($filename); + defined $st or return warn "$filename: $!\n"; + my $rpc; + if ($st->type eq 'F') { + $rpc='filedata'; + $st->size<=80000 or die "$filename: to big for broadcast (max 80000 bytes)\n"; + if ($st->size==0) { + udp_broadcast_message($donald_s,$rpc,$st,0,''); + return; + } + + my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; + my $i=0; + for (my $pos=0;$pos<$st->size;$pos+=1024) { + my $data; + defined $fh->sysread($data,1024) or return warn "$filename: $!\n"; + # warn "send bytes $pos to ",$pos+length($data),"\n"; + udp_broadcast_message($donald_s,$rpc,$st,$pos,$data); + ++$i % $BC_RATE or sleep 1; + } + } elsif ($st->type eq 'L') { + $rpc='filedata.2'; + udp_broadcast_message($donald_s,$rpc,$st,0,''); + return; + } else { + die "file type not supported\n"; + } + +} + +our %CMD = ( + 'mkmotd' => '/usr/sbin/mkmotd.pl', + 'flush-gidcache' => 'date -d tomorrow +%s > /proc/net/rpc/auth.unix.gid/flush', + 'reexport' => '/usr/bin/mxmount --reexport-only', + 'make-automaps' => '/sbin/make-automaps', ); sub send_exec { - my ($donald_s,$cmd)=@_; - unless (exists $CMD{$cmd}) { - die "available commands: ",join(' , ',keys %CMD),"\n"; - } - udp_broadcast_message($donald_s,'exec',$cmd); + my ($donald_s,$cmd)=@_; + unless (exists $CMD{$cmd}) { + die "available commands: ",join(' , ',keys %CMD),"\n"; + } + udp_broadcast_message($donald_s,'exec',$cmd); } sub udp_rx_exec { - my ($cmd)=@_; - - warn "exec $cmd\n"; - exists $CMD{$cmd} or return; - - my $pid; - $pid=fork; - unless (defined $pid) { - warn "$!\n"; - return; - } - unless ($pid) { - $pid=fork; - defined $pid or exit 1; - $pid and exit; - - open STDIN,'<','/dev/null'; - open STDOUT,'>','/dev/null'; - open STDERR,'>','/dev/null'; - alarm(60); - chdir '/'; - exec '/bin/sh','-c',$CMD{$cmd}; - exit 1; - } - wait; + my ($cmd)=@_; + + warn "exec $cmd\n"; + exists $CMD{$cmd} or return; + + my $pid; + $pid=fork; + unless (defined $pid) { + warn "$!\n"; + return; + } + unless ($pid) { + $pid=fork; + defined $pid or exit 1; + $pid and exit; + + open STDIN,'<','/dev/null'; + open STDOUT,'>','/dev/null'; + open STDERR,'>','/dev/null'; + alarm(60); + chdir '/'; + exec '/bin/sh','-c',$CMD{$cmd}; + exit 1; + } + wait; +} + +sub udp_rx_exec2 { + my @cmd = @_; + my $pid = fork; + unless (defined $pid) { + warn "$!\n"; + return; + } + if ($pid == 0) { + open STDIN,'<','/dev/null'; + alarm(60); + chdir '/'; + for my $cmd (@cmd) { + exists $CMD{$cmd} and warn "executing $CMD{$cmd}\n"; + exists $CMD{$cmd} and system '/bin/sh', '-c', $CMD{$cmd}; + } + exit; + } } #------------------------------------------------------------- -sub normalize_seg { # [pos,len],[pos,len],... - my @s=sort {$a->[0] <=> $b->[0]} @_; +sub normalize_seg { # [pos,len],[pos,len],... + my @s=sort {$a->[0] <=> $b->[0]} @_; - my $i=0; - while ($i<$#s) { - # is element $i joinable with next element + my $i=0; + while ($i<$#s) { + # is element $i joinable with next element - my $end_0=$s[$i]->[0]+$s[$i]->[1]; - if ($end_0 >= $s[$i+1]->[0] ) { - my $end_1=$s[$i+1]->[0]+$s[$i+1]->[1]; - $s[$i]->[1] = ($end_0>$end_1 ? $end_0 : $end_1)-$s[$i]->[0]; - splice @s,$i+1,1; - } else { - $i++; - } - } - return @s; + my $end_0=$s[$i]->[0]+$s[$i]->[1]; + if ($end_0 >= $s[$i+1]->[0] ) { + my $end_1=$s[$i+1]->[0]+$s[$i+1]->[1]; + $s[$i]->[1] = ($end_0>$end_1 ? $end_0 : $end_1)-$s[$i]->[0]; + splice @s,$i+1,1; + } else { + $i++; + } + } + return @s; } -my %RECEIVER; # ( filename => $receiver, .... ) +my %RECEIVER; # ( filename => $receiver, .... ) # $receiver : [ st_want , last_rx , io_handle , [ [pos,len] , [pos,len] , ... ] ] sub purge_old_receiver { - while (my ($n,$v)=each %RECEIVER) { - if ($v->[1]+10[0]->name,"\n"; - log_to_stat_target('timeout receiving ',$v->[0]->name); - delete $RECEIVER{$n}; - } - } - My::Select::timeout_requeue(60); + while (my ($n,$v)=each %RECEIVER) { + if ($v->[1]+10[0]->name,"\n"; + log_to_stat_target('timeout receiving ',$v->[0]->name); + delete $RECEIVER{$n}; + } + } + My::Select::timeout_requeue(60); } #------------------------------------------------------------- @@ -974,239 +956,217 @@ our $INSTALLED_DIGEST=''; our $rx_filedata_done; sub udp_rx_amdtardata { - my ($st_want,$pos,$data,$digest)=@_; + my ($st_want,$pos,$data,$digest)=@_; - ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo'; + ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo'; - ### $digest eq $INSTALLED_DIGEST and $pos==0 and warn "/etc/amd - ",Digest::MD5::md5_hex($digest)," already installed\n"; - $digest eq $INSTALLED_DIGEST and return; + ### $digest eq $INSTALLED_DIGEST and $pos==0 and warn "/etc/amd - ",Digest::MD5::md5_hex($digest)," already installed\n"; + $digest eq $INSTALLED_DIGEST and return; - #### $pos==0 and warn "receiving /etc/amd - ",Digest::MD5::md5_hex($digest),"\n"; + #### $pos==0 and warn "receiving /etc/amd - ",Digest::MD5::md5_hex($digest),"\n"; - udp_rx_filedata($st_want,$pos,$data); - if ($rx_filedata_done) { - my $pid=fork; - defined $pid or return warn "$!\n"; - unless($pid) { - chdir '/etc/amd' or die "/etc/amd: $!\n"; - exec 'tar','xzf',$st_want->name; - die "$!\n"; - } - } - wait; - $? and return; + udp_rx_filedata($st_want,$pos,$data); + if ($rx_filedata_done) { + my $pid=fork; + defined $pid or return warn "$!\n"; + unless($pid) { + chdir '/etc/amd' or die "/etc/amd: $!\n"; + exec 'tar','xzf',$st_want->name; + die "$!\n"; + } + } + wait; + $? and return; - warn "installed /etc/amd - ",Digest::MD5::md5_hex($digest),"\n"; - $INSTALLED_DIGEST=$digest; - system '/sbin/make-automaps'; + warn "installed /etc/amd - ",Digest::MD5::md5_hex($digest),"\n"; + $INSTALLED_DIGEST=$digest; + system '/sbin/make-automaps'; } -our ($machine,$SYS_lchown,$SYS_mknod,$lmtime_sub); +our ($machine,$SYS_lchown,$SYS_mknod); our ($SYS_utimensat,$AT_FDCWD,$UTIME_OMIT,$AT_SYMLINK_NOFOLLOW); -sub lmtime_unsupported { - my ($path,$mtime)=@_; - warn "$path: don't known how to change symlink mtime on target architecture\n"; -} -sub lmtime_utimensat { - my ($path,$mtime)=@_; - my $tsa=pack 'qqqq',0,$UTIME_OMIT,$mtime,0; - syscall($SYS_utimensat,$AT_FDCWD,$path,$tsa,$AT_SYMLINK_NOFOLLOW)==0 or return warn "$path: failed to lmtime: $!\n"; -} -$lmtime_sub=\&lmtime_unsupported; - chomp($machine=`uname -m`); -if ($machine eq 'i686') { - $SYS_lchown=198; # __NR_lchown32 in /usr/include/asm/unistd.h - $SYS_mknod=14; # __NR_mknod -} elsif ($machine eq 'x86_64') { - $SYS_lchown=94; # __NR_lchown in /usr/include/asm-x86_64/unistd.h - $SYS_mknod=133; # __NR_mknod - - $SYS_utimensat=280; # /usr/include/asm/unistd_64.h - $AT_FDCWD=-100; # /usr/include/fcntl.h - $UTIME_OMIT=(1<<30)-2; # /usr/include/bits/stat.h - $AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/fcntl.h - $lmtime_sub=\&lmtime_utimensat; -} elsif ($machine eq 'alpha') { - $SYS_lchown=208; # SYS_lchown in /usr/include/syscall.h - $SYS_mknod=14; # SYS_mknod -} elsif ($machine eq 'amd64') { - $SYS_lchown=254; # SYS_lchown in /usr/include/syscall.h - $SYS_mknod=14; # SYS_mknod +if ($machine eq 'x86_64') { + $SYS_lchown=94; # __NR_lchown in /usr/include/asm-x86_64/unistd.h + $SYS_mknod=133; # __NR_mknod + + $SYS_utimensat=280; # /usr/include/asm/unistd_64.h + $AT_FDCWD=-100; # /usr/include/fcntl.h + $UTIME_OMIT=(1<<30)-2; # /usr/include/bits/stat.h + $AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/fcntl.h } elsif ($machine eq 'ppc64le') { - $SYS_lchown=16; # __NR_lchown in /usr/include/powerpc64le-linux-gnu/asm/unistd.h - $SYS_mknod=14; # __NR_mknod - - $SYS_utimensat=304; # __NR_utimensat in /usr/include/powerpc64le-linux-gnu/asm/unistd.h - $AT_FDCWD=-100; # /usr/include/linux/fcntl.h - $UTIME_OMIT=(1<<30)-2; # /usr/include/powerpc64le-linux-gnu/bits/stat.h - $AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/linux/fcntl.h - $lmtime_sub=\&lmtime_utimensat; + $SYS_lchown=16; # __NR_lchown in /usr/include/powerpc64le-linux-gnu/asm/unistd.h + $SYS_mknod=14; # __NR_mknod + + $SYS_utimensat=304; # __NR_utimensat in /usr/include/powerpc64le-linux-gnu/asm/unistd.h + $AT_FDCWD=-100; # /usr/include/linux/fcntl.h + $UTIME_OMIT=(1<<30)-2; # /usr/include/powerpc64le-linux-gnu/bits/stat.h + $AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/linux/fcntl.h } else { - warn "unknown machine type $machine: symlink ownership can't be set.\n"; - warn "unknown machine type $machine: named pipes,character and block devices can't be created\n"; + warn "unknown machine type $machine: symlink ownership can't be set.\n"; + warn "unknown machine type $machine: named pipes,character and block devices can't be created\n"; } sub lchown { - my ($uid,$gid,$path)=@_; - $SYS_lchown or return; - syscall($SYS_lchown,$path,$uid+0,$gid+0)==0 or return warn "$path: failed to lchown: $!\n"; + my ($uid,$gid,$path)=@_; + $SYS_lchown or return; + syscall($SYS_lchown,$path,$uid+0,$gid+0)==0 or return warn "$path: failed to lchown: $!\n"; } sub lmtime { - my ($mtime,$path)=@_; - $lmtime_sub or return; - $lmtime_sub->($path,$mtime); + my ($mtime,$path)=@_; + my $tsa=pack 'qqqq',0,$UTIME_OMIT,$mtime,0; + syscall($SYS_utimensat,$AT_FDCWD,$path,$tsa,$AT_SYMLINK_NOFOLLOW)==0 or return warn "$path: failed to lmtime: $!\n"; } sub udp_rx_filedata { # set rx_filedata_done as a side effect - my ($st_want,$pos,$data)=@_; - - ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo'; - - my $filename=$st_want->name; - my $tmp_filename="$filename.tmp"; - - $rx_filedata_done=0; - - my $st_is=Donald::FileInfo->lstat($st_want->name); - if ($st_is && $st_is->type eq 'F' && $st_is->size==$st_want->size && $st_is->mtime==$st_want->mtime) { - #### $pos==0 and warn " $filename seems to be current\n"; - return; - } - - if ($st_want->type eq 'L') { - if (!$st_is || $st_is->type ne 'L' || $st_is->target ne $st_want->target) { - $st_is and (unlink($filename) or return warn "$filename: failed to unlink: $!\n"); - symlink($st_want->target,$filename) or return warn "$filename: failed to create symlink: $!\n"; - lchown($st_want->uid,$st_want->gid,$filename); - lmtime($st_want->mtime,$filename); - warn "installed $filename -> ".$st_want->target."\n"; - } else { - if ($st_is->uid != $st_want->uid || $st_is->gid != $st_want->gid) { - lchown($st_want->uid,$st_want->gid,$filename); - } - if ($st_is->mtime != $st_want->mtime) { - lmtime($st_want->mtime,$filename); - } - } - return; - } - - if (length($data) == $st_want->size) { - # complete file in one broadcast - -e $tmp_filename and unlink($tmp_filename); - my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); - defined $fh or return warn "$tmp_filename: $!\n"; - if ($st_want->size) { - $fh->syswrite($data) or return warn "$tmp_filename: $!\n"; - $fh->sync(); - } - chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; - chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; - utime($st_want->mtime,$st_want->mtime,$tmp_filename); - rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; - warn "installed $filename\n"; - $rx_filedata_done=1; - return; - } - - length($data) or return; # shouldn't happen. - - my $receiver=$RECEIVER{$st_want->name}; - - if (defined $receiver) { - if ( $receiver->[0]->size != $st_want->size or $receiver->[0]->mtime != $st_want->mtime ) { - $receiver=undef; - } - } - - unless (defined $receiver) { - # create new receiver - ## warn "start receiving $filename from $udp_peer_addr\n"; - -e $tmp_filename and unlink($tmp_filename); - my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); - defined $fh or return warn "$tmp_filename: $!\n"; - $receiver = [$st_want,My::Select::time,$fh,[]]; - $RECEIVER{$filename}=$receiver; - } - - { - # warn "$filename: receive $pos length ",length($data),"\n"; - - # write data ( size cant be 0 here ) - $receiver->[2]->seek($pos,0) or return warn "$tmp_filename: $!\n"; - $receiver->[2]->print($data) or return warn "$tmp_filename: $!\n";; - $receiver->[1]=My::Select::time; - my $s=$receiver->[3]=[normalize_seg(@{$receiver->[3]},[$pos,length($data)])]; - - #warn "$filename: receive $pos length ",length($data)," segments now: ",join(",",map( {'['.$_->[0].','.$_->[1].']'} @{$receiver->[3]})),"\n"; - - # all there ? - - if (@$s == 1 && $s->[0]->[0]==0 && $s->[0]->[1]==$st_want->size) { - $receiver->[2]->flush(); - $receiver->[2]->sync(); - chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; - chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; - utime($st_want->mtime,$st_want->mtime,$tmp_filename); - rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; - warn "installed $filename\n"; - delete $RECEIVER{$filename}; - $rx_filedata_done=1; - } - } + my ($st_want,$pos,$data)=@_; + + ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo'; + + my $filename=$st_want->name; + my $tmp_filename="$filename.tmp"; + + $rx_filedata_done=0; + + my $st_is=Donald::FileInfo->lstat($st_want->name); + if ($st_is && $st_is->type eq 'F' && $st_is->size==$st_want->size && $st_is->mtime==$st_want->mtime) { + #### $pos==0 and warn " $filename seems to be current\n"; + return; + } + + if ($st_want->type eq 'L') { + if (!$st_is || $st_is->type ne 'L' || $st_is->target ne $st_want->target) { + $st_is and (unlink($filename) or return warn "$filename: failed to unlink: $!\n"); + symlink($st_want->target,$filename) or return warn "$filename: failed to create symlink: $!\n"; + lchown($st_want->uid,$st_want->gid,$filename); + lmtime($st_want->mtime,$filename); + warn "installed $filename -> ".$st_want->target."\n"; + } else { + if ($st_is->uid != $st_want->uid || $st_is->gid != $st_want->gid) { + lchown($st_want->uid,$st_want->gid,$filename); + } + if ($st_is->mtime != $st_want->mtime) { + lmtime($st_want->mtime,$filename); + } + } + return; +} + + if (length($data) == $st_want->size) { + # complete file in one broadcast + -e $tmp_filename and unlink($tmp_filename); + my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); + defined $fh or return warn "$tmp_filename: $!\n"; + if ($st_want->size) { + $fh->syswrite($data) or return warn "$tmp_filename: $!\n"; + $fh->sync(); + } + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$tmp_filename); + rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; + warn "installed $filename\n"; + $rx_filedata_done=1; + return; + } + + length($data) or return; # shouldn't happen. + + my $receiver=$RECEIVER{$st_want->name}; + + if (defined $receiver) { + if ( $receiver->[0]->size != $st_want->size or $receiver->[0]->mtime != $st_want->mtime ) { + $receiver=undef; + } + } + + unless (defined $receiver) { + # create new receiver + ## warn "start receiving $filename from $udp_peer_addr\n"; + -e $tmp_filename and unlink($tmp_filename); + my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); + defined $fh or return warn "$tmp_filename: $!\n"; + $receiver = [$st_want,My::Select::time,$fh,[]]; + $RECEIVER{$filename}=$receiver; + } + + { + # warn "$filename: receive $pos length ",length($data),"\n"; + + # write data ( size cant be 0 here ) + $receiver->[2]->seek($pos,0) or return warn "$tmp_filename: $!\n"; + $receiver->[2]->print($data) or return warn "$tmp_filename: $!\n";; + $receiver->[1]=My::Select::time; + my $s=$receiver->[3]=[normalize_seg(@{$receiver->[3]},[$pos,length($data)])]; + + #warn "$filename: receive $pos length ",length($data)," segments now: ",join(",",map( {'['.$_->[0].','.$_->[1].']'} @{$receiver->[3]})),"\n"; + + # all there ? + + if (@$s == 1 && $s->[0]->[0]==0 && $s->[0]->[1]==$st_want->size) { + $receiver->[2]->flush(); + $receiver->[2]->sync(); + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$tmp_filename); + rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; + warn "installed $filename\n"; + delete $RECEIVER{$filename}; + $rx_filedata_done=1; + } + } } #----------------------------------------------------------- sample running proc every 10 seconds -our @proc_running=(0)x12; # 12 samples every 5 seconds -> minute average -our @proc_running_10=(0)x10; # 10 samples every minute -> 10 minute average +our @proc_running=(0)x12; # 12 samples every 5 seconds -> minute average +our @proc_running_10=(0)x10; # 10 samples every minute -> 10 minute average our $SAMPLE_TICK=0; sub xget_cur_running_proc { - open L,'<','/proc/stat' or return 0; - while () { - # warn $_; - return $1 if /^procs_running\s*(\d+)/; - } - return 0; + open L,'<','/proc/stat' or return 0; + while () { + # warn $_; + return $1 if /^procs_running\s*(\d+)/; + } + return 0; } sub get_cur_running_proc { - our $CPUS; - my $r=xget_cur_running_proc(); - if ($CPUS && $r>$CPUS+1) { - pload_debug("more than $CPUS+1 proc"); - } - return $r; + our $CPUS; + my $r=xget_cur_running_proc(); + if ($CPUS && $r>$CPUS+1) { + pload_debug("more than $CPUS+1 proc"); + } + return $r; } sub running_proc { - my $ret=0; - $ret+=$_ for @proc_running; - return $ret/@proc_running; + my $ret=0; + $ret+=$_ for @proc_running; + return $ret/@proc_running; } sub running_proc_10 { - my $ret=0; - $ret+=$_ for @proc_running_10; - return $ret/@proc_running_10; + my $ret=0; + $ret+=$_ for @proc_running_10; + return $ret/@proc_running_10; } -sub sample_rproc { # every 5 seconds - @proc_running=(@proc_running[1..@proc_running-1],get_cur_running_proc()-1); - if ($SAMPLE_TICK<12) { - $SAMPLE_TICK++; - } else { - @proc_running_10=(@proc_running_10[1..@proc_running_10-1],running_proc()); - $SAMPLE_TICK=0; - } - My::Select::timeout_requeue(5); +sub sample_rproc { # every 5 seconds + @proc_running=(@proc_running[1..@proc_running-1],get_cur_running_proc()-1); + if ($SAMPLE_TICK<12) { + $SAMPLE_TICK++; + } else { + @proc_running_10=(@proc_running_10[1..@proc_running_10-1],running_proc()); + $SAMPLE_TICK=0; + } + My::Select::timeout_requeue(5); } #----------------------------------------------------------- stat @@ -1214,93 +1174,79 @@ sub sample_rproc { # every 5 seconds our ($CPUS,$BOGOMIPS); sub init_cpuinfo { - Donald::Tools::is_alpha and return; - open L,'<','/proc/cpuinfo' or return; - while () { - if (/^bogomips\s*:\s*([\d.]+)/) {$CPUS++;$BOGOMIPS+=$1;} - } -} - -sub loadavg { # AXP : (system load average) , LINUX: (system load average, pload, freebogo) - if (Donald::Tools::is_alpha) { - my $data=pack 'l!3lx4l!3'; - my $i=syscall(85,3,0,$data,1,length($data)); # table(id=TBL_LOADAVG,index=0,addr=data,nel=1,lel=56) - my ($l0,$l1,$l2,$scale,$mach0,$mach1,$mach2)=unpack 'l!3lx4l!3',$data; - $scale or return undef; - return $l0/$scale; - } else { - $CPUS or init_cpuinfo(); - my $running_proc=running_proc(); - open L,'<','/proc/loadavg' or return undef; - my $data; - sysread(L,$data,1024); - close L; - $data =~ /^(\d+\.?\d*)/ or return undef; # 5 min loadavg - if ($CPUS) { - return ($1+0,($running_proc)/$CPUS,($running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS)); - } else { - return $1+0; - } - } + open L,'<','/proc/cpuinfo' or return; + while () { + if (/^bogomips\s*:\s*([\d.]+)/) {$CPUS++;$BOGOMIPS+=$1;} + } +} + +sub loadavg { # AXP : (system load average) , LINUX: (system load average, pload, freebogo) + $CPUS or init_cpuinfo(); + my $running_proc=running_proc(); + open L,'<','/proc/loadavg' or return undef; + my $data; + sysread(L,$data,1024); + close L; + $data =~ /^(\d+\.?\d*)/ or return undef; # 5 min loadavg + if ($CPUS) { + return ($1+0,($running_proc)/$CPUS,($running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS)); + } else { + return $1+0; + } } our $STAT_TARGET='afk'; our $STAT_SEQ=0; sub send_stat { - My::Select::timeout_requeue(600); - my ($load_avg,$pload,$pcapacity)=loadavg(); - defined $load_avg or return; - udp_send_message($STAT_TARGET,'loadavg.2',$my_hostname,$STAT_SEQ++,$load_avg,version_info(),$pload,$pcapacity,$my_unixrev); + My::Select::timeout_requeue(600); + my ($load_avg,$pload,$pcapacity)=loadavg(); + defined $load_avg or return; + udp_send_message($STAT_TARGET,'loadavg.2',$my_hostname,$STAT_SEQ++,$load_avg,version_info(),$pload,$pcapacity,$my_unixrev); } sub udp_rx_loadavg2 { - my ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev)=@_; - $my_hostname eq $STAT_TARGET and My::Cluster::Updown::rx_hostannounce($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev) + my ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev)=@_; + $my_hostname eq $STAT_TARGET and My::Cluster::Updown::rx_hostannounce($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev) } sub udp_rx_log { - my ($msg)=@_; - My::Cluster::Updown::msg_text($msg); + my ($msg)=@_; + My::Cluster::Updown::msg_text($msg); } sub log_to_stat_target { - my ($msg)=join '',@_; - udp_send_message($STAT_TARGET,'log',"$my_hostname: $msg"); + my ($msg)=join '',@_; + udp_send_message($STAT_TARGET,'log',"$my_hostname: $msg"); } # ---------------------------------------------------------- sub udp_rx_restart { - # double-fork, because kill_previous_server() won't kill its parent - my $pid=fork; - if (defined $pid && $pid==0) { - my $pid2=fork; - if (defined $pid2 && $pid==0) { - exec '/usr/sbin/clusterd','--kill','--daemon'; - die "exec failed: $!\n"; - } - } + # just exit and leave the restart to the service manager + # exist status 40 can be used with RestartForceExitStatus= + warn "received restart request. Exiting with status 40\n"; + exit 40; } sub udp_rx_flush_gidcache { - if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') { - print $out time(); - } else { - warn "proc/net/rpc/auth.unix.gid/flush: $!\n"; - } + if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') { + print $out time(); + } else { + warn "proc/net/rpc/auth.unix.gid/flush: $!\n"; + } } sub udp_rx_make_automaps { - if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') { - print $out time(); - } else { - warn "proc/net/rpc/auth.unix.gid/flush: $!\n"; - } - system '/sbin/make-automaps'; + if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') { + print $out time(); + } else { + warn "proc/net/rpc/auth.unix.gid/flush: $!\n"; + } + system '/sbin/make-automaps'; } sub udp_rx_reexport { - system '/usr/bin/mxmount --reexport-only'; + system '/usr/bin/mxmount --reexport-only'; } #----------- tcp mgmt console ----------------------------- @@ -1311,79 +1257,79 @@ our $mgmt_listen_socket; our %mgmt_sockets; sub mgmt_init { - $mgmt_listen_socket=new IO::Socket::INET(LocalPort=>$MGMT_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1); - defined $mgmt_listen_socket or die "$!\n"; - My::Select::reader($mgmt_listen_socket,\&mgmt_connect_request); + $mgmt_listen_socket=new IO::Socket::INET(LocalPort=>$MGMT_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1); + defined $mgmt_listen_socket or die "$!\n"; + My::Select::reader($mgmt_listen_socket,\&mgmt_connect_request); } sub mgmt_connect_request { - My::Select::reader_requeue(); + My::Select::reader_requeue(); - # listen socket ready + # listen socket ready - my $socket=$mgmt_listen_socket->accept(); - $socket->blocking(0); - my $peernode=$socket->peerhost; + my $socket=$mgmt_listen_socket->accept(); + $socket->blocking(0); + my $peernode=$socket->peerhost; - ### warn "accepted mgmt connection from $peernode\n"; + ### warn "accepted mgmt connection from $peernode\n"; - My::Select::reader($socket,sub{mgmt_receive($socket)}); - $mgmt_sockets{$socket}=$socket; - $socket->print("clusterd ".version_info()." stupid console\n"); - $socket->print("For historical messages, grep \"clusterd\" from /var/log/messages on $STAT_TARGET (or \"tail -f /var/log/messages |grep cluster\")\n"); + My::Select::reader($socket,sub{mgmt_receive($socket)}); + $mgmt_sockets{$socket}=$socket; + $socket->print("clusterd ".version_info()." stupid console\n"); + $socket->print("For historical messages, grep \"clusterd\" from /var/log/messages on $STAT_TARGET (or \"tail -f /var/log/messages |grep cluster\")\n"); } sub mgmt_receive { - my ($s)=@_; - my $data; - my $len=$s->sysread($data,1024); - if (!defined $len or $len==0 or $len==1 && $data eq "\cD") { - delete $mgmt_sockets{$s}; - $s->close; - ###warn "lost connection to mgmt port\n"; - return; - } - - My::Select::reader_requeue(); - $data=~s/\r?\n$//; - length $data or return; - if ($data eq 'l') { - $my_hostname eq $STAT_TARGET and My::Cluster::Updown::status($s); - $s->print("AREA: ",area_config_as_string(),"\n"); - $s->print("STAT TARGET: ",$STAT_TARGET,"\n"); - $s->print(' (',scalar(localtime),')',"\n"); - } - elsif ($data eq 'r') { - for my $host (sort keys %H) { - if ($H{$host}->[0] eq 'UP') { - $s->printf("%-40s : %s\n",$host,$H{$host}->[7]); - } - } - } elsif ($data eq 'v') { - $Data::Dumper::Terse=1; - $Data::Dumper::Indent=0; - for (sort keys %H) { - $s->printf("%15s : %s\n",$_,Dumper($H{$_})); - } - } elsif ($data eq 'd') { - my $running_proc=running_proc(); - my $running_proc_10=running_proc_10(); - $s->printf("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc); - $s->printf("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10); - $CPUS or init_cpuinfo; - $CPUS and $s->printf("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS); - $CPUS and $s->printf("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS); - } elsif ($data =~ /^delete (\S+)$/) { - My::Cluster::Updown::delete_host($1); - } elsif ($data eq 'dup show') { - for my $h (sort {$DUPLICATES{$b} <=> $DUPLICATES{$a} } keys %DUPLICATES) { - $s->printf("%-10s : %5d\n",$h,$DUPLICATES{$h}); - } - } elsif ($data eq 'dup clear') { - %DUPLICATES=(); - } else { - $s->print(<<"_EOF_"); + my ($s)=@_; + my $data; + my $len=$s->sysread($data,1024); + if (!defined $len or $len==0 or $len==1 && $data eq "\cD") { + delete $mgmt_sockets{$s}; + $s->close; + ###warn "lost connection to mgmt port\n"; + return; + } + + My::Select::reader_requeue(); + $data=~s/\r?\n$//; + length $data or return; + if ($data eq 'l') { + $my_hostname eq $STAT_TARGET and My::Cluster::Updown::status($s); + $s->print("AREA: ",area_config_as_string(),"\n"); + $s->print("STAT TARGET: ",$STAT_TARGET,"\n"); + $s->print(' (',scalar(localtime),')',"\n"); + } + elsif ($data eq 'r') { + for my $host (sort keys %H) { + if ($H{$host}->[0] eq 'UP') { + $s->printf("%-40s : %s\n",$host,$H{$host}->[7]); + } + } + } elsif ($data eq 'v') { + $Data::Dumper::Terse=1; + $Data::Dumper::Indent=0; + for (sort keys %H) { + $s->printf("%15s : %s\n",$_,Dumper($H{$_})); + } + } elsif ($data eq 'd') { + my $running_proc=running_proc(); + my $running_proc_10=running_proc_10(); + $s->printf("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc); + $s->printf("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10); + $CPUS or init_cpuinfo; + $CPUS and $s->printf("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS); + $CPUS and $s->printf("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS); + } elsif ($data =~ /^delete (\S+)$/) { + My::Cluster::Updown::delete_host($1); + } elsif ($data eq 'dup show') { + for my $h (sort {$DUPLICATES{$b} <=> $DUPLICATES{$a} } keys %DUPLICATES) { + $s->printf("%-10s : %5d\n",$h,$DUPLICATES{$h}); + } + } elsif ($data eq 'dup clear') { + %DUPLICATES=(); + } else { + $s->print(<<"_EOF_"); unknown command: $data l : list status @@ -1392,135 +1338,135 @@ d : debug (cpu speed calc) r : dump unix revisions delete HOST : forget about HOST -dup show : show duplicates stat -dup clear : clear duplicates stat +dup show : show duplicates stat +dup clear : clear duplicates stat to exit use ^D _EOF_ - } + } } sub mgmt_print_all { - my ($msg)=@_; + my ($msg)=@_; - $options{'foreground'} and not $options{'syslog'} and print $msg; + $options{'foreground'} and not $options{'syslog'} and print $msg; - for my $s (values(%mgmt_sockets)) { - $s->print($msg); - } + for my $s (values(%mgmt_sockets)) { + $s->print($msg); + } } #----------------------------------------------------------- sub clp_rx_LSOF { - my ($socket,$pattern)=@_; - - my $pid=fork; - unless (defined $pid) { - warn"$!\n"; - return; - } - unless ($pid) { - my $pid=fork; - defined $pid or die "$!\n"; - unless ($pid) { - $socket->blocking(1); - # -n inhibits the conversion of network numbers to host names for network files. - # -b causes lsof to avoid kernel functions that might block - lstat(2), readlink(2), and stat(2). - # -w disables warning messages. - open P,'timeout -k 92s 90s lsof -n -b -w|' or die "$!\n"; - while (

) { - next if defined $pattern && index($_,$pattern)<0; - $socket->send(pack('n',length($_)).$_,0); - } - close P; - if ($?) { - $_=sprintf("** lsof timout/error on %s\n",$my_hostname); - $socket->send(pack('n',length($_)).$_,0); - } - close $socket; - exit; - } - exit; - } - close $socket; - wait; - return 1; + my ($socket,$pattern)=@_; + + my $pid=fork; + unless (defined $pid) { + warn"$!\n"; + return; + } + unless ($pid) { + my $pid=fork; + defined $pid or die "$!\n"; + unless ($pid) { + $socket->blocking(1); + # -n inhibits the conversion of network numbers to host names for network files. + # -b causes lsof to avoid kernel functions that might block - lstat(2), readlink(2), and stat(2). + # -w disables warning messages. + open P,'timeout -k 92s 90s lsof -n -b -w|' or die "$!\n"; + while (

) { + next if defined $pattern && index($_,$pattern)<0; + $socket->send(pack('n',length($_)).$_,0); + } + close P; + if ($?) { + $_=sprintf("** lsof timout/error on %s\n",$my_hostname); + $socket->send(pack('n',length($_)).$_,0); + } + close $socket; + exit; + } + exit; + } + close $socket; + wait; + return 1; } sub run_cmd { - my ($socket,@cmd)=@_; - my $pid=fork; - unless (defined $pid) { - warn"$!\n"; - return; - } - unless ($pid) { - my $opipe=new IO::Pipe; - my $epipe=new IO::Pipe; - my $cpid=fork; - defined $cpid or die "$!\n"; - unless ($cpid) { - warn "exec ".join(' ',@cmd)."\n"; - $opipe->writer(); - $epipe->writer(); - open STDIN,'<','/dev/null'; - open STDOUT,'>&',$opipe; - open STDERR,'>&',$epipe; - exec @cmd; - die "$!\n"; - } -# $::SIG{'CHLD'}=sub { -# my $pid=wait; -# my $buffer="X$?"; -# $socket->send(pack('n',length($buffer)).$buffer,0); -# exit; -# }; - $opipe->reader(); - $epipe->reader(); - my $ofn=$opipe->fileno; - my $efn=$epipe->fileno; - my ($rvec_in,$wvec_in,$evec_in)=('','',''); - vec($rvec_in,$ofn,1)=1; - vec($rvec_in,$efn,1)=1; - my $channel=2; - my $buffer; - $socket->blocking(1); - while (1) { - my ($rvec,$wvec,$evec)=($rvec_in,$wvec_in,$evec_in); - my $ready=select($rvec,$wvec,$evec,60); - $ready or die "timeout\n"; - if (vec($rvec,$ofn,1)) { - my $len=$opipe->sysread($buffer,1024-1-2); - defined $len or die "$!\n"; - if ($len) { - $socket->send(pack('n',$len+1)."O$buffer",0); - } else { - vec($rvec_in,$ofn,1)=0; - $opipe->close; - $channel--; - } - } - if (vec($rvec,$efn,1)) { - my $len=$epipe->sysread($buffer,1024-1-2); - defined $len or die "$!\n"; - if ($len) { - $socket->send(pack('n',$len+1)."E$buffer",0); - } else { - vec($rvec_in,$efn,1)=0; - $epipe->close; - $channel--; - } - } - if ($channel==0) { - my $pid=wait; - my $buffer="X$?"; - $socket->send(pack('n',length($buffer)).$buffer,0); - exit; - } - } - } + my ($socket,@cmd)=@_; + my $pid=fork; + unless (defined $pid) { + warn"$!\n"; + return; + } + unless ($pid) { + my $opipe=new IO::Pipe; + my $epipe=new IO::Pipe; + my $cpid=fork; + defined $cpid or die "$!\n"; + unless ($cpid) { + warn "exec ".join(' ',@cmd)."\n"; + $opipe->writer(); + $epipe->writer(); + open STDIN,'<','/dev/null'; + open STDOUT,'>&',$opipe; + open STDERR,'>&',$epipe; + exec @cmd; + die "$!\n"; + } +# $::SIG{'CHLD'}=sub { +# my $pid=wait; +# my $buffer="X$?"; +# $socket->send(pack('n',length($buffer)).$buffer,0); +# exit; +# }; + $opipe->reader(); + $epipe->reader(); + my $ofn=$opipe->fileno; + my $efn=$epipe->fileno; + my ($rvec_in,$wvec_in,$evec_in)=('','',''); + vec($rvec_in,$ofn,1)=1; + vec($rvec_in,$efn,1)=1; + my $channel=2; + my $buffer; + $socket->blocking(1); + while (1) { + my ($rvec,$wvec,$evec)=($rvec_in,$wvec_in,$evec_in); + my $ready=select($rvec,$wvec,$evec,60); + $ready or die "timeout\n"; + if (vec($rvec,$ofn,1)) { + my $len=$opipe->sysread($buffer,1024-1-2); + defined $len or die "$!\n"; + if ($len) { + $socket->send(pack('n',$len+1)."O$buffer",0); + } else { + vec($rvec_in,$ofn,1)=0; + $opipe->close; + $channel--; + } + } + if (vec($rvec,$efn,1)) { + my $len=$epipe->sysread($buffer,1024-1-2); + defined $len or die "$!\n"; + if ($len) { + $socket->send(pack('n',$len+1)."E$buffer",0); + } else { + vec($rvec_in,$efn,1)=0; + $epipe->close; + $channel--; + } + } + if ($channel==0) { + my $pid=wait; + my $buffer="X$?"; + $socket->send(pack('n',length($buffer)).$buffer,0); + exit; + } + } + } } #----------- CLP cluster protocol ----------------------------- @@ -1531,70 +1477,70 @@ our $clp_listen_socket; our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF,'PULL'=>\&clp_rx_PULL); sub clp_init { - $clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>128,ReuseAddr=>1); - defined $clp_listen_socket or die "$!\n"; - My::Select::reader($clp_listen_socket,\&clp_connect_request); + $clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>128,ReuseAddr=>1); + defined $clp_listen_socket or die "$!\n"; + My::Select::reader($clp_listen_socket,\&clp_connect_request); } sub clp_connect_request { - My::Select::reader_requeue(); + My::Select::reader_requeue(); - # listen socket ready + # listen socket ready - my $socket=$clp_listen_socket->accept(); - $socket->blocking(0); - my $peernode=$socket->peerhost; + my $socket=$clp_listen_socket->accept(); + $socket->blocking(0); + my $peernode=$socket->peerhost; - my $buffer=''; + my $buffer=''; - My::Select::reader($socket,sub{clp_receive($socket,\$buffer)}); + My::Select::reader($socket,sub{clp_receive($socket,\$buffer)}); } sub clp_receive { - my ($s,$bufref)=@_; - my $data; - defined $s->recv($data,$TCP_MAX) or return; - if (!length($data) ) { - $s->close; - return; - } - $$bufref.=$data; - while (1) { - last if length($$bufref)<2; - my $l=unpack('n',$$bufref); - last if length($$bufref)<2+$l; - my $msg=substr($$bufref,2,$l); - $$bufref=substr($$bufref,2+$l); - clp_message($s,$msg) and return; - } - My::Select::reader_requeue(); + my ($s,$bufref)=@_; + my $data; + defined $s->recv($data,$TCP_MAX) or return; + if (!length($data) ) { + $s->close; + return; + } + $$bufref.=$data; + while (1) { + last if length($$bufref)<2; + my $l=unpack('n',$$bufref); + last if length($$bufref)<2+$l; + my $msg=substr($$bufref,2,$l); + $$bufref=substr($$bufref,2+$l); + clp_message($s,$msg) and return; + } + My::Select::reader_requeue(); } sub clp_message { - my ($socket,$data)=@_; + my ($socket,$data)=@_; - defined $CLUSTER_PW or return; + defined $CLUSTER_PW or return; - my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return; - $CLP_HANDLER{$handler_name}->($socket,@args) if exists $CLP_HANDLER{$handler_name}; + my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return; + $CLP_HANDLER{$handler_name}->($socket,@args) if exists $CLP_HANDLER{$handler_name}; } -sub clp_send_message { # clp_send_message($socket, @args) - my ($s,@args)=@_; - defined $CLUSTER_PW or return; - my $data=sign($CLUSTER_PW,encode(@args)); - unless ($s->peername) { - return 0; - } - $s->send(pack('n',length($data)).$data); +sub clp_send_message { # clp_send_message($socket, @args) + my ($s,@args)=@_; + defined $CLUSTER_PW or return; + my $data=sign($CLUSTER_PW,encode(@args)); + unless ($s->peername) { + return 0; + } + $s->send(pack('n',length($data)).$data); } sub clp_rx_CMD { - my ($socket,@args)=@_; - run_cmd($socket,@args); - close $socket; - return 1; + my ($socket,@args)=@_; + run_cmd($socket,@args); + close $socket; + return 1; } # send_tcp_cp($socket,$cb,$timeout,@args) @@ -1604,127 +1550,133 @@ sub clp_rx_CMD { # assume $CLUSTER_PW is valid # sub send_tcp_cp { - my ($s,$cb,$timeout,@args)=@_; - my $data=sign($CLUSTER_PW,encode(@args)); - My::Select::INET::send_tcp($s,pack('n',length($data)).$data,$timeout,$cb); + my ($s,$cb,$timeout,@args)=@_; + my $data=sign($CLUSTER_PW,encode(@args)); + My::Select::INET::send_tcp($s,pack('n',length($data)).$data,$timeout,$cb); +} + +sub send_tcp_cp_sync { + my ($s, @args) = @_; + my $data = sign($CLUSTER_PW, encode(@args)); + $s->printflush(pack('n', length($data)) . $data); } + #---------------------------------------------------------- sub sync_cluster_pw { - my $st=Donald::FileInfo->lstat($CLUSTER_PW_FILE); - - if ($st) { - if (!defined $CLUSTER_PW or $CLUSTER_PW_TIMESTAMP != $st->mtime) { - my $fh=new IO::File $CLUSTER_PW_FILE,'<'; - if (defined ($fh)) { - defined $CLUSTER_PW and warn "update cluster password\n"; - $fh->read($CLUSTER_PW,1024); - $CLUSTER_PW_TIMESTAMP=$st->mtime; - } else { - defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n"; - $CLUSTER_PW=undef; - } - } - } else { - defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n"; - $CLUSTER_PW=undef; - } - My::Select::timeout(60,\&sync_cluster_pw); - return defined $CLUSTER_PW; + my $st=Donald::FileInfo->lstat($CLUSTER_PW_FILE); + + if ($st) { + if (!defined $CLUSTER_PW or $CLUSTER_PW_TIMESTAMP != $st->mtime) { + my $fh=new IO::File $CLUSTER_PW_FILE,'<'; + if (defined ($fh)) { + defined $CLUSTER_PW and warn "update cluster password\n"; + $fh->read($CLUSTER_PW,1024); + $CLUSTER_PW_TIMESTAMP=$st->mtime; + } else { + defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n"; + $CLUSTER_PW=undef; + } + } + } else { + defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n"; + $CLUSTER_PW=undef; + } + My::Select::timeout(60,\&sync_cluster_pw); + return defined $CLUSTER_PW; } #------------------------------------------------------------ # area routing -our %AREA_ROUTER= -( - wtf => '141.14.31.255', +our %AREA_ROUTER = ( + wtf => '141.14.31.255', ); our $area_socket; sub init_area { - exists $AREA_ROUTER{$my_hostname} or return; - warn "I am area router for $AREA_ROUTER{$my_hostname}\n"; + exists $AREA_ROUTER{$my_hostname} or return; + warn "I am area router for $AREA_ROUTER{$my_hostname}\n"; - $area_socket=new IO::Socket::INET (Proto=>'udp',LocalPort=>$UDP_PORT+1) or die "$!\n"; - My::Select::reader($area_socket,sub{area_message($area_socket)}); + $area_socket=new IO::Socket::INET (Proto=>'udp',LocalPort=>$UDP_PORT+1) or die "$!\n"; + My::Select::reader($area_socket,sub{area_message($area_socket)}); } sub area_message { - my ($area_socket)=@_; - my $data; - my $peer = $area_socket->recv($data,$UDP_MAX); - my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); - my $udp_peer_addr=inet_ntoa($peer_iaddr); + my ($area_socket)=@_; + my $data; + my $peer = $area_socket->recv($data,$UDP_MAX); + my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); + my $udp_peer_addr=inet_ntoa($peer_iaddr); - $donald_s->send_data($AREA_ROUTER{$my_hostname},$UDP_PORT,$data); # broadcast to our network - My::Select::reader_requeue(); + $donald_s->send_data($AREA_ROUTER{$my_hostname},$UDP_PORT,$data); # broadcast to our network + My::Select::reader_requeue(); } sub udp_broadcast_message { - my ($donald_s,@args)=@_; + my ($donald_s,@args)=@_; - defined $CLUSTER_PW or return; - my $data=sign($CLUSTER_PW,encode(@args)); - for my $ip (keys %AREA_ROUTER) { - $donald_s->send_data($ip,$UDP_PORT+1,$data); - } + defined $CLUSTER_PW or return; + my $data=sign($CLUSTER_PW,encode(@args)); + for my $ip (keys %AREA_ROUTER) { + $donald_s->send_data($ip,$UDP_PORT+1,$data); + } } sub area_config_as_string { - return join(',',map({"$_ => $AREA_ROUTER{$_}"} keys %AREA_ROUTER)) + return join(',',map({"$_ => $AREA_ROUTER{$_}"} keys %AREA_ROUTER)) } #------------------------------------------------------------ -our $PROG_FILE; # saved $0 - may be relative path +our $PROG_FILE; # saved $0 - may be relative path our $PROG_MTIME; sub check_progfile_status { - defined $PROG_FILE or $PROG_FILE=$0; - my @f=lstat $PROG_FILE or return; - $PROG_MTIME=$f[9]; + defined $PROG_FILE or $PROG_FILE=$0; + my @f=lstat $PROG_FILE or return; + $PROG_MTIME=$f[9]; } -sub version_info { # '20090617-155314' - my $t; - if (defined $PROG_MTIME) { - my @f=localtime($PROG_MTIME); - return sprintf '%4d%02d%02d-%02d%02d%02d',$f[5]+1900,$f[4]+1,$f[3],$f[2],$f[1],$f[0]; - } else { - return '?'; - } +sub version_info { # '20090617-155314' + my $t; + if (defined $PROG_MTIME) { + my @f=localtime($PROG_MTIME); + return sprintf '%4d%02d%02d-%02d%02d%02d',$f[5]+1900,$f[4]+1,$f[3],$f[2],$f[1],$f[0]; + } else { + return '?'; + } } #------------------------------------------------------------ sub pload_debug { - my ($why)=@_; - open T,'>','/var/log/pload_debug.txt' or return; - print T "$why at ".scalar(localtime),"\n"; - my $running_proc=running_proc(); - my $running_proc_10=running_proc_10(); - printf T ("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc); - printf T ("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10); - $CPUS or init_cpuinfo; - $CPUS and printf T ("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS); - $CPUS and printf T ("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS); - system "uptime>>/var/log/pload_debug.txt"; - system "ps -AlfT>>/var/log/pload_debug.txt"; - system "cat /proc/stat>>/var/log/pload_debug.txt"; - system "cat /proc/swaps>>/var/log/pload_debug.txt"; - close T; + my ($why)=@_; + open T,'>','/var/log/pload_debug.txt' or return; + print T "$why at ".scalar(localtime),"\n"; + my $running_proc=running_proc(); + my $running_proc_10=running_proc_10(); + printf T ("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc); + printf T ("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10); + $CPUS or init_cpuinfo; + $CPUS and printf T ("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS); + $CPUS and printf T ("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS); + system "uptime>>/var/log/pload_debug.txt"; + system "ps -AlfT>>/var/log/pload_debug.txt"; + system "cat /proc/stat>>/var/log/pload_debug.txt"; + system "cat /proc/swaps>>/var/log/pload_debug.txt"; + close T; } sub check_overload() { - My::Select::timeout_requeue(600); - if ($CPUS) { - my $running_proc_10=running_proc_10(); - my $pload_10=running_proc_10()/$CPUS; - $pload_10>1.5 and warn (sprintf("pload>150%% (%3.1f%%)\n",$pload_10*100)); - } + My::Select::timeout_requeue(600); + if ($CPUS) { + my $running_proc_10=running_proc_10(); + my $pload_10=running_proc_10()/$CPUS; + $pload_10>1.5 and warn (sprintf("pload>150%% (%3.1f%%)\n",$pload_10*100)); + } } ##################################################### @@ -1733,253 +1685,384 @@ my $slave=0; my $exit_value=0; sub expand_hostconfig_hosts { - my ($filter)=@_; - open my $p,'-|','hostconfig','--list',$filter or die "hostconfig: $!\”n"; - return split ' ',readline($p); + my ($filter)=@_; + open my $p,'-|','hostconfig','--list',$filter or die "hostconfig: $!\”n"; + return split ' ',readline($p); } sub exec_at { - my ($host,@cmd)=@_; - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - my $s=new IO::Socket::INET(PeerAddr=>$host,PeerPort=>$CLP_PORT); - unless (defined $s) { - die "$host: $!\n"; - } - clp_send_message($s,'CMD',@cmd); - my $pbuffer=''; - my $olbuffer=''; - my $elbuffer=''; - My::Select::reader($s,sub{cmd_rx($host,$s,\$pbuffer,\$olbuffer,\$elbuffer)}); - $slave=1; - My::Select::run() if $slave;; + my ($host,@cmd)=@_; + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + my $s=new IO::Socket::INET(PeerAddr=>$host,PeerPort=>$CLP_PORT); + unless (defined $s) { + die "$host: $!\n"; + } + clp_send_message($s,'CMD',@cmd); + my $pbuffer=''; + my $olbuffer=''; + my $elbuffer=''; + My::Select::reader($s,sub{cmd_rx($host,$s,\$pbuffer,\$olbuffer,\$elbuffer)}); + $slave=1; + My::Select::run() if $slave;; } sub lsof { - my ($pattern)=@_; - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - for my $host (sort(expand_hostconfig_hosts('amd'))) { - next if $host eq 'tux'; - my $s=new IO::Socket::INET (PeerAddr=>$host,PeerPort=>$CLP_PORT); - unless (defined $s) { - warn "$host: $!\n"; - next; - } - clp_send_message($s,'LSOF',$pattern); - my $pbuffer=''; - my $olbuffer=''; - My::Select::reader($s,sub{lsof_rx($host,$s,\$pbuffer,\$olbuffer)}); - $slave++; - } - My::Select::run() if $slave;; + my ($pattern)=@_; + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + for my $host (sort(expand_hostconfig_hosts('amd'))) { + next if $host eq 'tux'; + my $s=new IO::Socket::INET (PeerAddr=>$host,PeerPort=>$CLP_PORT); + unless (defined $s) { + warn "$host: $!\n"; + next; + } + clp_send_message($s,'LSOF',$pattern); + my $pbuffer=''; + my $olbuffer=''; + My::Select::reader($s,sub{lsof_rx($host,$s,\$pbuffer,\$olbuffer)}); + $slave++; + } + My::Select::run() if $slave;; } sub lsof_rx { - my ($host,$s,$pbufref,$olbufref)=@_; - my $data; - defined $s->recv($data,$TCP_MAX*2) or return; - if (!length($data)) { - close $s; - --$slave; - exit $exit_value unless $slave; - return; - } - $$pbufref.=$data; - while (1) { - last if length($$pbufref)<2; - my $l=unpack('n',$$pbufref); - last if length($$pbufref)<2+$l; - my $msg=substr($$pbufref,2,$l); - $$pbufref=substr($$pbufref,2+$l); - lsof_msg($host,$s,$msg,$olbufref); - } - My::Select::reader_requeue(); + my ($host,$s,$pbufref,$olbufref)=@_; + my $data; + defined $s->recv($data,$TCP_MAX*2) or return; + if (!length($data)) { + close $s; + --$slave; + exit $exit_value unless $slave; + return; + } + $$pbufref.=$data; + while (1) { + last if length($$pbufref)<2; + my $l=unpack('n',$$pbufref); + last if length($$pbufref)<2+$l; + my $msg=substr($$pbufref,2,$l); + $$pbufref=substr($$pbufref,2+$l); + lsof_msg($host,$s,$msg,$olbufref); + } + My::Select::reader_requeue(); } sub lsof_msg { - my ($host,$s,$data,$olbufref)=@_; - my $msg=$$olbufref.$data; - for ($msg=~/([^\n]*\n)/gs) { - print "$host $_"; - } - ($$olbufref)=$msg=~/([^\n]*)\z/; + my ($host,$s,$data,$olbufref)=@_; + my $msg=$$olbufref.$data; + for ($msg=~/([^\n]*\n)/gs) { + print "$host $_"; + } + ($$olbufref)=$msg=~/([^\n]*)\z/; } sub cmd_rx { - my ($host,$s,$pbufref,$olbufref,$elbufref)=@_; - my $data; - defined $s->recv($data,$TCP_MAX*2) or return; - if (!length($data)) { - close $s; - --$slave; - exit $exit_value unless $slave; - return; - } - $$pbufref.=$data; - while (1) { - last if length($$pbufref)<2; - my $l=unpack('n',$$pbufref); - last if length($$pbufref)<2+$l; - my $msg=substr($$pbufref,2,$l); - $$pbufref=substr($$pbufref,2+$l); - cmd_msg($host,$s,$msg,$olbufref,$elbufref); - } - My::Select::reader_requeue(); + my ($host,$s,$pbufref,$olbufref,$elbufref)=@_; + my $data; + defined $s->recv($data,$TCP_MAX*2) or return; + if (!length($data)) { + close $s; + --$slave; + exit $exit_value unless $slave; + return; + } + $$pbufref.=$data; + while (1) { + last if length($$pbufref)<2; + my $l=unpack('n',$$pbufref); + last if length($$pbufref)<2+$l; + my $msg=substr($$pbufref,2,$l); + $$pbufref=substr($$pbufref,2+$l); + cmd_msg($host,$s,$msg,$olbufref,$elbufref); + } + My::Select::reader_requeue(); } sub cmd_msg { - my ($host,$s,$data,$olbufref,$elbufref)=@_; - - my $channel=substr($data,0,1); - if ($channel eq 'X') { - my $rdata=substr($data,1); - $rdata!=0 and $exit_value=1; - return; - } elsif ($channel eq 'O') { - my $msg=$$olbufref.substr($data,1); - for ($msg=~/([^\n]*\n)/gs) { - print "$host: $_"; - } - ($$olbufref)=$msg=~/([^\n]*)\z/; - } elsif ($channel eq 'E') { - my $msg=$$elbufref.substr($data,1); - for ($msg=~/([^\n]*\n)/gs) { - print STDERR "$host: $_"; - } - ($$elbufref)=$msg=~/([^\n]*)\z/; - } -} - -our $SYS_SENDFILE=40; # /usr/include/asm/unistd_64.h + my ($host,$s,$data,$olbufref,$elbufref)=@_; + + my $channel=substr($data,0,1); + if ($channel eq 'X') { + my $rdata=substr($data,1); + $rdata!=0 and $exit_value=1; + return; + } elsif ($channel eq 'O') { + my $msg=$$olbufref.substr($data,1); + for ($msg=~/([^\n]*\n)/gs) { + print "$host: $_"; + } + ($$olbufref)=$msg=~/([^\n]*)\z/; + } elsif ($channel eq 'E') { + my $msg=$$elbufref.substr($data,1); + for ($msg=~/([^\n]*\n)/gs) { + print STDERR "$host: $_"; + } + ($$elbufref)=$msg=~/([^\n]*)\z/; + } +} + +our $SYS_SENDFILE = 40; # /usr/include/asm/unistd_64.h sub sendfile { - my ($out_fd,$in_fd,$offset,$count)=@_; - my $ret=syscall($SYS_SENDFILE,fileno($out_fd),fileno($in_fd),$offset,$count); - return $ret<0 ? undef : $ret; + my ($out_fd,$in_fd,$offset,$count)=@_; + my $ret=syscall($SYS_SENDFILE,fileno($out_fd),fileno($in_fd),$offset,$count); + return $ret<0 ? undef : $ret; } sub clp_rx_PULL { - my ($s,$st_want)=@_; - - my $st_is=Donald::FileInfo->lstat($st_want->name); - if (!defined $st_is or $st_is->type ne 'F' or $st_is->size != $st_want->size or $st_is->mtime != $st_want->mtime) { - warn $st_want->name." requested by ".$s->peerhost.": no longer available\n"; - return; - } - my $fh; - unless (open $fh,'<',$st_want->name) { - warn $st_want->name.": $!\n"; - return; - } - my $bytes=$st_is->size; - my $cb_tmo=sub { My::Select::cancel_handle($s); }; - my $cb_write=sub { - my $l=sendfile($s,$fh,0,$bytes); - unless (defined $l) { - warn "$!"; - return; - } - $bytes-=$l; - if ($bytes) { - My::Select::writer_requeue if $bytes; - } else { - close($s); - } - }; - My::Select::timeout(5,$cb_tmo); - My::Select::writer($s,$cb_write); + my ($s,$st_want)=@_; + + my $st_is=Donald::FileInfo->lstat($st_want->name); + if (!defined $st_is or $st_is->type ne 'F' or $st_is->size != $st_want->size or $st_is->mtime != $st_want->mtime) { + warn $st_want->name." requested by ".$s->peerhost.": no longer available\n"; + return; + } + my $fh; + unless (open $fh,'<',$st_want->name) { + warn $st_want->name.": $!\n"; + return; + } + my $bytes=$st_is->size; + my $cb_tmo=sub { My::Select::cancel_handle($s); }; + my $cb_write=sub { + my $l=sendfile($s,$fh,0,$bytes); + unless (defined $l) { + warn "$!"; + return; + } + $bytes-=$l; + if ($bytes) { + My::Select::writer_requeue if $bytes; + } else { + close($s); + } + }; + My::Select::timeout(5,$cb_tmo); + My::Select::writer($s,$cb_write); } sub udp_rx_push { - my ($ip,$st_want)=@_; - - my $filename=$st_want->name; - my $tmp_filename="$filename.tmp"; - - $ip eq $my_ip and return; - - my $st_is=Donald::FileInfo->lstat($st_want->name); - - unless ($st_want->type eq 'F') { - warn "$filename: type ".$st_want->type." not yet implemented\n"; - return; - } - - if ($st_is - && $st_is->type eq 'F' - && $st_is->size == $st_want->size - && $st_is->mtime == $st_want->mtime - && $st_is->uid == $st_want->uid - && $st_is->gid == $st_want->gid - && $st_is->perm == $st_want->perm - ) { - warn "$filename: already okay\n"; - return; - } - - if ($st_want->size==0) { - -e $tmp_filename and unlink($tmp_filename); - my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); - defined $fh or return warn "$tmp_filename: $!\n"; - # no need to fsync empty file - chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; - chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; - utime($st_want->mtime,$st_want->mtime,$tmp_filename); - rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n";; - warn "installed (empty) $filename\n"; - return; - } - - my $s; - $s=My::Select::INET::connect_tcp($ip,$CLP_PORT,$TCP_TIMEOUT,sub { - $! and return warn "$ip: $!\n"; - send_tcp_cp($s,sub { - $! and return warn "$ip: $!\n"; - -e $tmp_filename and unlink($tmp_filename); - my $fh = IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); - defined $fh or return warn "$tmp_filename: $!\n"; - - my $cb; - my $bytes=$st_want->size; - $cb=sub { - # note, we need to break the circular references $cb of our caller, if no longer needed - my ($buf)=@_; - if ($!) { warn "$ip: $!\n";$cb=undef;return; } - if (length($buf)==0) { warn "$ip: EOF\n";$cb=undef;return;} - $fh->print($buf) or return warn "$tmp_filename: $!\n"; - $bytes-=length($buf); - if ($bytes>0) { - My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); - return; - } - $cb=undef; - $fh->flush(); - $fh->sync(); - chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; - chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; - utime($st_want->mtime,$st_want->mtime,$tmp_filename); - rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; - warn "installed $filename\n"; - }; - My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); - },$TCP_TIMEOUT,'PULL',$st_want); - }); + my ($ip,$st_want)=@_; + + my $filename=$st_want->name; + my $tmp_filename="$filename.tmp"; + + $ip eq $my_ip and return; + + my $st_is=Donald::FileInfo->lstat($st_want->name); + + unless ($st_want->type eq 'F') { + warn "$filename: type ".$st_want->type." not yet implemented\n"; + return; + } + + if ($st_is + && $st_is->type eq 'F' + && $st_is->size == $st_want->size + && $st_is->mtime == $st_want->mtime + && $st_is->uid == $st_want->uid + && $st_is->gid == $st_want->gid + && $st_is->perm == $st_want->perm + ) { + warn "$filename: already okay\n"; + return; + } + + if ($st_want->size==0) { + -e $tmp_filename and unlink($tmp_filename); + my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); + defined $fh or return warn "$tmp_filename: $!\n"; + # no need to fsync empty file + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$tmp_filename); + rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n";; + warn "installed (empty) $filename\n"; + return; + } + + my $s; + $s=My::Select::INET::connect_tcp($ip,$CLP_PORT,$TCP_TIMEOUT,sub { + $! and return warn "$ip: $!\n"; + send_tcp_cp($s,sub { + $! and return warn "$ip: $!\n"; + -e $tmp_filename and unlink($tmp_filename); + my $fh = IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); + defined $fh or return warn "$tmp_filename: $!\n"; + + my $cb; + my $bytes=$st_want->size; + $cb=sub { + # note, we need to break the circular references $cb of our caller, if no longer needed + my ($buf)=@_; + if ($!) { warn "$ip: $!\n";$cb=undef;return; } + if (length($buf)==0) { warn "$ip: EOF\n";$cb=undef;return;} + $fh->print($buf) or return warn "$tmp_filename: $!\n"; + $bytes-=length($buf); + if ($bytes>0) { + My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); + return; + } + $cb=undef; + $fh->flush(); + $fh->sync(); + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$tmp_filename); + rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; + warn "installed $filename\n"; + }; + My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); + },$TCP_TIMEOUT,'PULL',$st_want); + }); +} + +our %TRUSTED_IP = ( + '141.14.28.170' => 1, # afk + '141.14.16.131' => 1, # wtf +); + +sub is_trusted_ip { + my ($ip) = @_; + return exists $TRUSTED_IP{$ip} ? 1 : 0; +} + +sub udp_rx_push2 { + my ($ip, $st_ary, $post_ary) = @_; + + unless (is_trusted_ip($ip)) { + warn "reject to pull files from $ip : not trusted\n"; + return; + } + my $pid = fork; + unless (defined $pid) { + warn "$!\n"; + return + } + $pid != 0 and return; + + if ($ip ne $my_ip) { +FILE: + for my $st_want (@$st_ary) { + my $filename=$st_want->name; + my $tmp_filename="$filename.tmp"; + unless ($st_want->type eq 'F') { + warn "$filename: type ".$st_want->type." not yet implemented\n"; + next; + } + my $st_is = Donald::FileInfo->lstat($st_want->name); + if ($st_is + && $st_is->type eq 'F' + && $st_is->size == $st_want->size + && $st_is->mtime == $st_want->mtime + && $st_is->uid == $st_want->uid + && $st_is->gid == $st_want->gid + && $st_is->perm == $st_want->perm + ) { + warn "$filename: already okay\n"; + next; + } + if ($st_want->size == 0) { + -e $tmp_filename and unlink($tmp_filename); + my $fh = IO::File->new($tmp_filename, O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW, 0); + unless (defined $fh) { + warn "$tmp_filename: $!\n"; + next; + } + # no need to fsync empty file + chown $st_want->uid, $st_want->gid, $tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm, $tmp_filename or return warn "$tmp_filename: $!\n"; + utime($st_want->mtime, $st_want->mtime, $tmp_filename); + rename($tmp_filename, $filename) or return warn "rename $tmp_filename $filename: $!\n";; + warn "installed (empty) $filename\n"; + next; + } + my $s = new IO::Socket::INET(PeerAddr => $ip, PeerPort => $CLP_PORT); + unless ($s) { + warn "$ip: $!\n"; + next; + } + send_tcp_cp_sync($s, 'PULL', $st_want); + -e $tmp_filename and unlink($tmp_filename); + my $fh = IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); + unless (defined $fh) { + warn "$tmp_filename: $!\n"; + next; + } + my $bytes = $st_want->size; + while (1) { + my $buf = ""; + my $len = read($s, $buf, 1024); + if ($len < 0) { + warn "$ip $filename: $!\n"; + next FILE; + } + if ($len == 0) { + warn "$ip $filename: file received to short\n"; + next FILE; + } + if ($len > $bytes) { + warn "$ip $filename: file received to long\n"; + next FILE; + } + print $fh $buf; + $bytes -= $len; + if ($bytes == 0) { + $fh->flush(); + $fh->sync(); + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$tmp_filename); + rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; + warn "installed $filename\n"; + next FILE; + } + } + } + } + open STDIN, '<', '/dev/null'; + chdir '/'; + alarm(60); + for my $cmd (@$post_ary) { + exists $CMD{$cmd} and warn "executing $CMD{$cmd}\n"; + exists $CMD{$cmd} and system '/bin/sh', '-c', $CMD{$cmd}; + } + exit; } sub cmd_push { - my @files=@_; - for my $filename (@files) { - $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; - -e $filename or die "$filename: no such file\n"; - } - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - for my $filename (@files) { - my $st=Donald::FileInfo->lstat($filename); - defined $st or die "$filename: $!\n"; - $st->type eq 'F' or die "$filename: only plain files currently supported\n"; - open my $test,'<',$filename or die "$filename: $!\n"; - udp_broadcast_message($donald_s,'push',$my_ip,$st); - } + my ($post, @files) = @_; + + is_trusted_ip($my_ip) or die "This command only works on a trusted host\n"; + + for my $cmd (@$post) { + exists $CMD{$cmd} or die "$cmd: only these commands are allowed: " . join(', ', keys %CMD) . "\n"; + } + for my $filename (@files) { + $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; + -e $filename or die "$filename: no such file\n"; + } + my @st = (); + for my $filename (@files) { + my $st = Donald::FileInfo->lstat($filename); + defined $st or die "$filename: $!\n"; + $st->type eq 'F' or die "$filename: only plain files currently supported\n"; + open my $test,'<', $filename or die "$filename: $!\n"; + push @st, $st; + } + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s, 'push.2', $my_ip, \@st, $post); +} + +sub cmd_exec { + my @cmd = @_; + for my $cmd (@cmd) { + exists $CMD{$cmd} or die "$cmd: only these commands are allowed: " . join(', ', keys %CMD) . "\n"; + } + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s, 'exec.2', @cmd); } #------------------------------------------------------------ @@ -1988,23 +2071,23 @@ our $TRUSTCHECK_PORT=236; our $trustcheck_listen_socket; sub trustcheck_init { - $trustcheck_listen_socket=new IO::Socket::INET(LocalPort=>$TRUSTCHECK_PORT,Proto=>'tcp',Listen=>10,ReuseAddr=>1); - defined $trustcheck_listen_socket or die "$!\n"; - My::Select::reader($trustcheck_listen_socket,\&trustcheck_connect_request); + $trustcheck_listen_socket=new IO::Socket::INET(LocalPort=>$TRUSTCHECK_PORT,Proto=>'tcp',Listen=>10,ReuseAddr=>1); + defined $trustcheck_listen_socket or die "$!\n"; + My::Select::reader($trustcheck_listen_socket,\&trustcheck_connect_request); } sub trustcheck_connect_request { - My::Select::reader_requeue(); - my $socket=$trustcheck_listen_socket->accept(); - $socket->blocking(0); - my $hostname = gethostbyaddr(inet_aton($socket->peerhost()), AF_INET); - system 'hostconfig','--host',$hostname,'amd'; - if ($? == 0) { - $socket->send("I trust you\n", 0); - } elsif ($? == 256) { - $socket->send("I don't trust you\n", 0); - } - close($socket); + My::Select::reader_requeue(); + my $socket=$trustcheck_listen_socket->accept(); + $socket->blocking(0); + my $hostname = gethostbyaddr(inet_aton($socket->peerhost()), AF_INET); + system 'hostconfig','--host',$hostname,'amd'; + if ($? == 0) { + $socket->send("I trust you\n", 0); + } elsif ($? == 256) { + $socket->send("I don't trust you\n", 0); + } + close($socket); } #------------------------------------------------------------ @@ -2013,7 +2096,7 @@ use constant USAGE => <<'__EOF__'; usage: $0 [options] - --push file # broadcast this file + --push file # broadcast this file --push-amd-tar # broadcast /etc/amd --send-restart # broadcast a restart request to all nodes --exec mkmotd # execute /usr/sbin/mkmotd.pl on all nodes @@ -2026,96 +2109,99 @@ usage: $0 [options] --daemon # start a daemon - push files.... # push files over tcp + push [--post CMD] files... # push files over tcp + exec CMD... # execute CMD on all nodes + CMD : mkmotd | flush-gidcache | reexport | make-automaps __EOF__ use Getopt::Long; -GetOptions -( - 'daemon' => \$options{'daemon'}, - 'push=s' => \$options{'push'}, - 'exec=s' => \$options{'exec'}, - 'push-amd-tar' => \$options{'push_amd_tar'}, - 'send-restart' => \$options{'send-restart'}, - 'flush-gidcache' => \$options{'flush-gidcache'}, - 'make-automaps' => \$options{'make-automaps'}, - 'reexport' => \$options{'reexport'}, - 'lsof=s' => \$options{'lsof'}, +GetOptions ( + 'daemon' => \$options{'daemon'}, + 'push=s' => \$options{'push'}, + 'exec=s' => \$options{'exec'}, + 'push-amd-tar' => \$options{'push_amd_tar'}, + 'send-restart' => \$options{'send-restart'}, + 'flush-gidcache' => \$options{'flush-gidcache'}, + 'make-automaps' => \$options{'make-automaps'}, + 'reexport' => \$options{'reexport'}, + 'lsof=s' => \$options{'lsof'}, + 'post=s@' => \$options{'post'}, ) or die USAGE; if (defined $options{'push'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - push_file($donald_s,$options{'push'}); + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + push_file($donald_s,$options{'push'}); } elsif (defined $options{'exec'}) { - if (substr($options{'exec'},0,1) eq '@') { - length(length($options{'exec'})>1) or die USAGE; - @ARGV>=1 or die USAGE; - exec_at(substr($options{'exec'},1),@ARGV); - } else { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - send_exec($donald_s,$options{'exec'}); - } + if (substr($options{'exec'},0,1) eq '@') { + length(length($options{'exec'})>1) or die USAGE; + @ARGV>=1 or die USAGE; + exec_at(substr($options{'exec'},1),@ARGV); + } else { + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + send_exec($donald_s,$options{'exec'}); + } } elsif (defined $options{'push_amd_tar'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - push_amd_tar($donald_s); + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + push_amd_tar($donald_s); } elsif (defined $options{'send-restart'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - udp_broadcast_message($donald_s,'restart'); + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s,'restart'); } elsif (defined $options{'flush-gidcache'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - udp_broadcast_message($donald_s,'flush-gidcache'); + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s,'flush-gidcache'); } elsif (defined $options{'make-automaps'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - udp_broadcast_message($donald_s,'make-automaps'); + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s,'make-automaps'); } elsif (defined $options{'reexport'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - udp_broadcast_message($donald_s,'reexport'); + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s,'reexport'); } elsif (defined $options{'daemon'}) { - $SIG{PIPE}='IGNORE'; + $SIG{PIPE}='IGNORE'; - $donald_s=new My::Select::INET(Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT) or die "$!\n"; - $donald_s->receive_data(\&udp_message,$donald_s); + $donald_s=new My::Select::INET(Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT) or die "$!\n"; + $donald_s->receive_data(\&udp_message,$donald_s); - openlog('clusterd','pid','daemon'); - Sys::Syslog::setlogsock('unix'); # with 'native' we get EOLs in the logfile, option "noeol" doesn't work + openlog('clusterd','pid','daemon'); + Sys::Syslog::setlogsock('unix'); # with 'native' we get EOLs in the logfile, option "noeol" doesn't work - check_progfile_status(); - warn "server started - ".version_info()."\n"; - init_area(); - mgmt_init(); - clp_init(); - trustcheck_init(); + check_progfile_status(); + warn "server started - ".version_info()."\n"; + init_area(); + mgmt_init(); + clp_init(); + trustcheck_init(); - sync_cluster_pw() or warn "$CLUSTER_PW_FILE: $!\n"; + sync_cluster_pw() or warn "$CLUSTER_PW_FILE: $!\n"; - My::Select::timeout(60,\&purge_old_receiver); - My::Select::timeout(rand(60),\&send_stat); - My::Select::timeout(0,\&sample_rproc) unless Donald::Tools::is_alpha; - $my_hostname eq $STAT_TARGET and My::Cluster::Updown::init(); - $my_hostname eq 'macheteinfach' and My::NetlogReceiver::init(); - My::Select::timeout(600,\&check_overload); + My::Select::timeout(60,\&purge_old_receiver); + My::Select::timeout(rand(60),\&send_stat); + My::Select::timeout(0,\&sample_rproc); + $my_hostname eq $STAT_TARGET and My::Cluster::Updown::init(); + $my_hostname eq 'macheteinfach' and My::NetlogReceiver::init(); + My::Select::timeout(600,\&check_overload); - My::Select::run(); + My::Select::run(); } elsif ($options{'lsof'}) { - lsof($options{'lsof'}); -} elsif ($options{'kill'}) { - Donald::Tools::kill_previous_server('clusterd'); + lsof($options{'lsof'}); } else { - @ARGV or die USAGE; - my ($cmd,@args)=@ARGV; - if ($cmd eq 'push') { - @args>0 or die USAGE; - cmd_push(@args); - } else { - die USAGE; - } + @ARGV or die USAGE; + my ($cmd,@args)=@ARGV; + if ($cmd eq 'push') { + @args>0 or die USAGE; + cmd_push($options{'post'} || [], @args); + } elsif ($cmd eq 'exec') { + @args > 0 or die USAGE; + cmd_exec(@args); + } else { + die USAGE; + } }