From 75d1f1ce1e86dcbbe4bbefd5d245d7ce1a6ebd00 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Mon, 27 Jan 2025 09:08:46 +0100 Subject: [PATCH 1/7] clusterd: Remove tabs --- clusterd/clusterd | 2863 ++++++++++++++++++++++----------------------- 1 file changed, 1431 insertions(+), 1432 deletions(-) diff --git a/clusterd/clusterd b/clusterd/clusterd index 1f9cfb8..818a145 100755 --- a/clusterd/clusterd +++ b/clusterd/clusterd @@ -16,82 +16,82 @@ use Storable; use Digest::MD5; sub hostname { - our $hostname; - unless (defined $hostname) { - $hostname=lc `/bin/hostname`; - chomp($hostname); - $hostname =~ s/\.molgen\.mpg\.de$//; - } - return $hostname; + our $hostname; + unless (defined $hostname) { + $hostname=lc `/bin/hostname`; + chomp($hostname); + $hostname =~ s/\.molgen\.mpg\.de$//; + } + return $hostname; } sub machine { - our $machine; - chomp($machine=`uname -m`) unless defined $machine; - return $machine; + our $machine; + chomp($machine=`uname -m`) unless defined $machine; + return $machine; } sub is_alpha { - return machine() eq 'alpha'; + return machine() eq 'alpha'; } sub uptime { - open U,'<','/proc/uptime' or die "/proc/uptime: $!\n"; - my $data; - sysread(U,$data,1024); - close U; - $data=~ /^(\d+\.?\d*)/ or die "bad data from /proc/uptime: $data\n"; - return $1+0; + open U,'<','/proc/uptime' or die "/proc/uptime: $!\n"; + my $data; + sysread(U,$data,1024); + close U; + $data=~ /^(\d+\.?\d*)/ or die "bad data from /proc/uptime: $data\n"; + return $1+0; } sub encode { - return Storable::nfreeze([@_]); + return Storable::nfreeze([@_]); } sub sign { - my ($password,$data)=@_; - return Digest::MD5::md5($password.$data).$data; # 16 byte prefix + my ($password,$data)=@_; + return Digest::MD5::md5($password.$data).$data; # 16 byte prefix } -sub check_sign { # signed-data -> undef or signed-data -> data - my ($password,$data)=@_; - length $data>16 or return undef; - my $rx_digest=substr($data,0,16); - my $signature=Digest::MD5::md5($password.substr($data,16)); - $rx_digest eq $signature or return undef; - return substr($data,16); +sub check_sign { # signed-data -> undef or signed-data -> data + my ($password,$data)=@_; + length $data>16 or return undef; + my $rx_digest=substr($data,0,16); + my $signature=Digest::MD5::md5($password.substr($data,16)); + $rx_digest eq $signature or return undef; + return substr($data,16); } sub decode { - my ($data)=@_; - my $msg; - eval { - $msg=Storable::thaw($data); - }; - $@ and return undef; - return @$msg; + my ($data)=@_; + my $msg; + eval { + $msg=Storable::thaw($data); + }; + $@ and return undef; + return @$msg; } -sub kill_previous_server { # kill_previous_server('clusterd') - my ($command)=@_; - my $ret=0; +sub kill_previous_server { # kill_previous_server('clusterd') + my ($command)=@_; + my $ret=0; - # quickfix - dont kill our parent which might be the init.d/script with the same name.... + # quickfix - dont kill our parent which might be the init.d/script with the same name.... - my $ppid; - for (`ps -o ppid,comm -p $$`) { - $ppid=$1 if /(\d+)/; - } + my $ppid; + for (`ps -o ppid,comm -p $$`) { + $ppid=$1 if /(\d+)/; + } - for (`ps -Ao pid,comm`) { - my @F=split; - if ($F[1] eq $command && $F[0] ne $$ && $F[0] ne $ppid) { - kill 1=>$F[0]; - warn "stopped $command pid $F[0]\n"; - $ret=1; - } - } - return $ret; + for (`ps -Ao pid,comm`) { + my @F=split; + if ($F[1] eq $command && $F[0] ne $$ && $F[0] ne $ppid) { + kill 1=>$F[0]; + warn "stopped $command pid $F[0]\n"; + $ret=1; + } + } + return $ret; } #------------------------------------- @@ -114,15 +114,15 @@ use Class::Struct (map {$_=>'$'} qw(name dev ino type perm nlink uid gid rdev si use Fcntl ':mode'; sub fn_escape { - my ($fn)=@_; - $fn=~s/([[:^graph:]\\])/'\x'.sprintf('%02x',ord($1))/ge; - return $fn; + my ($fn)=@_; + $fn=~s/([[:^graph:]\\])/'\x'.sprintf('%02x',ord($1))/ge; + return $fn; } sub fn_unescape { - my ($fn)=@_; - $fn=~s/\\x([0-9a-f]{0,2})/chr(hex($1))/gie; - return $fn; + my ($fn)=@_; + $fn=~s/\\x([0-9a-f]{0,2})/chr(hex($1))/gie; + return $fn; } # @@ -144,83 +144,83 @@ sub fn_unescape { # S path perm uid gid - - hardlink sub name_escaped { - my ($self)=shift; - return fn_escape($self->name); + my ($self)=shift; + return fn_escape($self->name); } sub fileid { - my ($self)=shift; - return($self->dev.'.'.$self->ino); + my ($self)=shift; + return($self->dev.'.'.$self->ino); } sub export_index { - my ($self)=@_; - my $type=$self->type; - return ( - $type, - fn_escape($self->name), - $self->perm, - $self->uid, - $self->gid, - ( - $type eq 'F' ? $self->size : - $type eq 'L' ? fn_escape($self->target) : - $type eq 'B' || $type eq 'C' ? $self->rdev : - '-' - ), - $type eq 'F'||$type eq 'D' ? $self->mtime : '-', - ); + my ($self)=@_; + my $type=$self->type; + return ( + $type, + fn_escape($self->name), + $self->perm, + $self->uid, + $self->gid, + ( + $type eq 'F' ? $self->size : + $type eq 'L' ? fn_escape($self->target) : + $type eq 'B' || $type eq 'C' ? $self->rdev : + '-' + ), + $type eq 'F'||$type eq 'D' ? $self->mtime : '-', + ); } sub import_index { - my ($class,@F)=@_; - my $type=$F[0]; - return bless [ - fn_unescape($F[1]), # name - 0,0, # dev,ino - $type, # type - $F[2], # perm - 0, # nlink - $F[3],$F[4], # uid,gid - ($type eq 'B' || $type eq 'C' ? $F[5] : 0), # rdev - ($type eq 'F' ? $F[5] : 0 ), # size - ($type eq 'F'||$type eq 'D' ? $F[6] : 0 ), # mtime - ($type eq 'L' ? fn_unescape($F[5]) : ''), # target - ], - $class; + my ($class,@F)=@_; + my $type=$F[0]; + return bless [ + fn_unescape($F[1]), # name + 0,0, # dev,ino + $type, # type + $F[2], # perm + 0, # nlink + $F[3],$F[4], # uid,gid + ($type eq 'B' || $type eq 'C' ? $F[5] : 0), # rdev + ($type eq 'F' ? $F[5] : 0 ), # size + ($type eq 'F'||$type eq 'D' ? $F[6] : 0 ), # mtime + ($type eq 'L' ? fn_unescape($F[5]) : ''), # target + ], + $class; } sub lstat { - my ($class,$filename)=@_; - my $target; - my @f; - - unless (@f=lstat $filename) { - $!==2 and return undef; # ENOENT - die "$filename: $!\n"; - } - if (-l _) { - defined ($target=readlink($filename)) or die "$filename: $!\n"; - } - my $type = - S_ISREG($f[2]) ? 'F' : - S_ISDIR($f[2]) ? 'D' : - S_ISLNK($f[2]) ? 'L' : - S_ISBLK($f[2]) ? 'B' : - S_ISCHR($f[2]) ? 'C' : - S_ISFIFO($f[2]) ? 'P' : - S_ISSOCK($f[2]) ? 'S' : - die ("$filename: unsupported file type\n"); - return bless [ - $filename, # name - @f[0,1], # dev,ino - $type, # type - S_IMODE($f[2]), # perm - @f[3..7], # nlink,uid,gid,rdev,size - $f[9], # mtime - ($type eq 'L' ? $target : '') # target - ], - $class; + my ($class,$filename)=@_; + my $target; + my @f; + + unless (@f=lstat $filename) { + $!==2 and return undef; # ENOENT + die "$filename: $!\n"; + } + if (-l _) { + defined ($target=readlink($filename)) or die "$filename: $!\n"; + } + my $type = + S_ISREG($f[2]) ? 'F' : + S_ISDIR($f[2]) ? 'D' : + S_ISLNK($f[2]) ? 'L' : + S_ISBLK($f[2]) ? 'B' : + S_ISCHR($f[2]) ? 'C' : + S_ISFIFO($f[2]) ? 'P' : + S_ISSOCK($f[2]) ? 'S' : + die ("$filename: unsupported file type\n"); + return bless [ + $filename, # name + @f[0,1], # dev,ino + $type, # type + S_IMODE($f[2]), # perm + @f[3..7], # nlink,uid,gid,rdev,size + $f[9], # mtime + ($type eq 'L' ? $target : '') # target + ], + $class; } #------------------------------------- @@ -230,122 +230,122 @@ our $time=Donald::Tools::uptime(); sub My::Select::time { - return $time; + return $time; } -our @TIMER=(); # ( [duetime,cb] , ... ) sorted by time +our @TIMER=(); # ( [duetime,cb] , ... ) sorted by time our $active_timer_cb; -sub timeout # cb=Select::timeout(seconds,cb) +sub timeout # cb=Select::timeout(seconds,cb) { - my ($delta,$cb)=@_; - my $duetime=$time+$delta; - @TIMER=sort {$a->[0] <=> $b->[0]} ( [$duetime,$cb] , @TIMER ); - return $cb; + my ($delta,$cb)=@_; + my $duetime=$time+$delta; + @TIMER=sort {$a->[0] <=> $b->[0]} ( [$duetime,$cb] , @TIMER ); + return $cb; } -sub timeout_cancel # cb=Select::timeout_cancel(cb) +sub timeout_cancel # cb=Select::timeout_cancel(cb) { - my ($cb)=@_; - @TIMER=grep {$_->[1] != $cb} @TIMER; - return $cb; + my ($cb)=@_; + @TIMER=grep {$_->[1] != $cb} @TIMER; + return $cb; } -sub timeout_requeue {timeout(shift,$active_timer_cb); } # Select::timeout_requeue(seconds) +sub timeout_requeue {timeout(shift,$active_timer_cb); } # Select::timeout_requeue(seconds) #------------------------------------ -our @READER; # ( [Handle,cb] , ... ) +our @READER; # ( [Handle,cb] , ... ) our @WRITER; our @EXCEPT; -our $active_io; # [Handle,cb] +our $active_io; # [Handle,cb] -sub reader # cb = Select::reader(Handle,cb) +sub reader # cb = Select::reader(Handle,cb) { - my ($handle,$cb)=@_; - push @READER,[$handle,$cb]; - return $cb; + my ($handle,$cb)=@_; + push @READER,[$handle,$cb]; + return $cb; } -sub writer # cb = Select::writer(Handle,cb) +sub writer # cb = Select::writer(Handle,cb) { - my ($handle,$cb)=@_; - push @WRITER,[$handle,$cb]; - return $cb; + my ($handle,$cb)=@_; + push @WRITER,[$handle,$cb]; + return $cb; } -sub except # cb = Select::except(Handle,cb) +sub except # cb = Select::except(Handle,cb) { - my ($handle,$cb)=@_; - push @EXCEPT,[$handle,$cb]; - return $cb; + my ($handle,$cb)=@_; + push @EXCEPT,[$handle,$cb]; + return $cb; } sub reader_requeue {push @READER,$active_io} sub writer_requeue {push @WRITER,$active_io} sub except_requeue {push @EXCEPT,$active_io} -sub cancel # $cb = Select::cancel(cb) +sub cancel # $cb = Select::cancel(cb) { - my ($cb)=@_; - defined $cb or $cb=$active_io->[1]; - @READER=grep {$_->[1] != $cb} @READER; - @WRITER=grep {$_->[1] != $cb} @WRITER; - @EXCEPT=grep {$_->[1] != $cb} @EXCEPT; + my ($cb)=@_; + defined $cb or $cb=$active_io->[1]; + @READER=grep {$_->[1] != $cb} @READER; + @WRITER=grep {$_->[1] != $cb} @WRITER; + @EXCEPT=grep {$_->[1] != $cb} @EXCEPT; } sub cancel_handle { - my ($handle)=@_; - @READER=grep {$_->[0] != $handle} @READER; - @WRITER=grep {$_->[0] != $handle} @WRITER; - @EXCEPT=grep {$_->[0] != $handle} @EXCEPT; + my ($handle)=@_; + @READER=grep {$_->[0] != $handle} @READER; + @WRITER=grep {$_->[0] != $handle} @WRITER; + @EXCEPT=grep {$_->[0] != $handle} @EXCEPT; } sub run { - while (1) { - $time=Donald::Tools::uptime(); - while (@TIMER && $TIMER[0]->[0]<=$time) { - $active_timer_cb=(shift @TIMER)->[1]; - $active_timer_cb->(); - } - $active_timer_cb=undef; - - my ($rvec,$wvec,$evec)=('','',''); - - for (@READER) { vec($rvec,$_->[0]->fileno,1)=1 } ; - for (@WRITER) { vec($wvec,$_->[0]->fileno,1)=1 } ; - for (@EXCEPT) { vec($evec,$_->[0]->fileno,1)=1 } ; - - my $ready=select($rvec,$wvec,$evec,1); - if ($ready>0) { - for (my $i=0;$i<@READER;$i++) { - if (vec($rvec,$READER[$i]->[0]->fileno,1)) { - $active_io=splice @READER,$i,1; - $active_io->[1]->(); - $active_io=undef; - last; - } - } - for (my $i=0;$i<@WRITER;$i++) { - if (vec($wvec,$WRITER[$i]->[0]->fileno,1)) { - $active_io=splice @WRITER,$i,1; - $active_io->[1]->(); - $active_io=undef; - last; - } - } - for (my $i=0;$i<@EXCEPT;$i++) { - if (vec($evec,$EXCEPT[$i]->[0]->fileno,1)) { - $active_io=splice @EXCEPT,$i,1; - $active_io->[1]->(); - $active_io=undef; - last; - } - } - } - } + while (1) { + $time=Donald::Tools::uptime(); + while (@TIMER && $TIMER[0]->[0]<=$time) { + $active_timer_cb=(shift @TIMER)->[1]; + $active_timer_cb->(); + } + $active_timer_cb=undef; + + my ($rvec,$wvec,$evec)=('','',''); + + for (@READER) { vec($rvec,$_->[0]->fileno,1)=1 } ; + for (@WRITER) { vec($wvec,$_->[0]->fileno,1)=1 } ; + for (@EXCEPT) { vec($evec,$_->[0]->fileno,1)=1 } ; + + my $ready=select($rvec,$wvec,$evec,1); + if ($ready>0) { + for (my $i=0;$i<@READER;$i++) { + if (vec($rvec,$READER[$i]->[0]->fileno,1)) { + $active_io=splice @READER,$i,1; + $active_io->[1]->(); + $active_io=undef; + last; + } + } + for (my $i=0;$i<@WRITER;$i++) { + if (vec($wvec,$WRITER[$i]->[0]->fileno,1)) { + $active_io=splice @WRITER,$i,1; + $active_io->[1]->(); + $active_io=undef; + last; + } + } + for (my $i=0;$i<@EXCEPT;$i++) { + if (vec($evec,$EXCEPT[$i]->[0]->fileno,1)) { + $active_io=splice @EXCEPT,$i,1; + $active_io->[1]->(); + $active_io=undef; + last; + } + } + } + } } #-------------------------------------- @@ -354,52 +354,52 @@ package My::Select::INET ; use Carp; use IO::Socket::INET; -our $UDP_MAX=1472; # for broadcast on alphas +our $UDP_MAX = 1472; # for broadcast on alphas our $SOL_SOCKET=1; our $SO_ERROR=4; sub get_socket_error { - my ($s)=@_; - return unpack('i',getsockopt($s,$SOL_SOCKET,$SO_ERROR)); + my ($s)=@_; + return unpack('i',getsockopt($s,$SOL_SOCKET,$SO_ERROR)); } -sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) +sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) { - my ($class,@args)=@_; - our $socket=new IO::Socket::INET (@args) or return undef; - return bless \$socket,$class; + my ($class,@args)=@_; + our $socket=new IO::Socket::INET (@args) or return undef; + return bless \$socket,$class; } sub send_data { - my ($self,$ip,$port,$data)=@_; - my $ip_address=inet_aton($ip); - unless (defined $ip_address) {carp("can't resolve $ip\n");return undef} - unless (length($data)<=$UDP_MAX) {carp("message to long\n");return undef} - $$self->send($data,0,pack_sockaddr_in($port,$ip_address)) or carp "$!\n"; + my ($self,$ip,$port,$data)=@_; + my $ip_address=inet_aton($ip); + unless (defined $ip_address) {carp("can't resolve $ip\n");return undef} + unless (length($data)<=$UDP_MAX) {carp("message to long\n");return undef} + $$self->send($data,0,pack_sockaddr_in($port,$ip_address)) or carp "$!\n"; } sub reader { - my ($self,$sub)=@_; - My::Select::reader($$self,$sub); + my ($self,$sub)=@_; + My::Select::reader($$self,$sub); } sub receive_data { - my ($self,$cb,@args)=@_; + my ($self,$cb,@args)=@_; - my $receive_data_cb=sub { - my $data; - my $peer = $$self->recv($data,$UDP_MAX); - my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); - my $udp_peer_addr=inet_ntoa($peer_iaddr); - My::Select::reader_requeue(); - $cb->($data,$udp_peer_addr,$udp_peer_port); - }; - My::Select::reader($$self,$receive_data_cb); + my $receive_data_cb=sub { + my $data; + my $peer = $$self->recv($data,$UDP_MAX); + my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); + my $udp_peer_addr=inet_ntoa($peer_iaddr); + My::Select::reader_requeue(); + $cb->($data,$udp_peer_addr,$udp_peer_port); + }; + My::Select::reader($$self,$receive_data_cb); } # send_tcp($socket,$data,$timeout,$cb); @@ -410,35 +410,35 @@ sub receive_data # all arguments required # sub send_tcp { - my ($s,$data,$timeout,$cb)=@_; - my $len=$s->send($data,0); - defined $len or return $cb->(); - if ($len==length($data)) { - $!=0; - $cb->(); - return; - } - my $pos=$len; - my $cb_tmo=sub { - My::Select::cancel_handle($s); - $!=110; - $cb->(); - }; - my $cb_write=sub { - My::Select::timeout_cancel($cb_tmo); - my $len=send($s,substr($data,$pos),0); - defined $len or return $cb->(); - if ($len==length($data)-$pos) { - $!=0; - $cb->(); - return; - } - $pos+=$len; - My::Select::timeout($timeout,$cb_tmo); - My::Select::writer_requeue(); - }; - My::Select::timeout($timeout,$cb_tmo); - My::Select::writer($s,$cb_write); + my ($s,$data,$timeout,$cb)=@_; + my $len=$s->send($data,0); + defined $len or return $cb->(); + if ($len==length($data)) { + $!=0; + $cb->(); + return; + } + my $pos=$len; + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(); + }; + my $cb_write=sub { + My::Select::timeout_cancel($cb_tmo); + my $len=send($s,substr($data,$pos),0); + defined $len or return $cb->(); + if ($len==length($data)-$pos) { + $!=0; + $cb->(); + return; + } + $pos+=$len; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer_requeue(); + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer($s,$cb_write); } # $socket = connect_tcp ($ip,$port,$timeout,$cb) @@ -449,23 +449,23 @@ sub send_tcp { # all arguments required # sub connect_tcp { - my ($ip,$port,$timeout,$cb)=@_; - - my $s=new IO::Socket::INET (PeerAddr=>$ip,PeerPort=>$port,Blocking=>0); - defined $s or return $cb->(); - my $cb_tmo=sub { - My::Select::cancel_handle($s); - $!=110; - $cb->(); - }; - my $cb_write=sub { - My::Select::timeout_cancel($cb_tmo); - $!=get_socket_error($s); - $cb->(); - }; - My::Select::timeout($timeout,$cb_tmo); - My::Select::writer($s,$cb_write); - return $s; + my ($ip,$port,$timeout,$cb)=@_; + + my $s=new IO::Socket::INET (PeerAddr=>$ip,PeerPort=>$port,Blocking=>0); + defined $s or return $cb->(); + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(); + }; + my $cb_write=sub { + My::Select::timeout_cancel($cb_tmo); + $!=get_socket_error($s); + $cb->(); + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer($s,$cb_write); + return $s; } # read_with_timeout($socket,$callback,$timeout) @@ -473,25 +473,25 @@ sub connect_tcp { # asynchronously read from tcp socket. # sub read_with_timeout { - my ($s,$cb,$timeout)=@_; - my $cb_tmo=sub { - My::Select::cancel_handle($s); - $!=110; - $cb->(undef); - }; - my $cb_read=sub { - My::Select::timeout_cancel($cb_tmo); - my $buf=''; - my $l=sysread($s,$buf,102400,0); - if (!defined $l) { - $cb->(undef); - } else { - $!=0; - $cb->($buf); - } - }; - My::Select::timeout($timeout,$cb_tmo); - My::Select::reader($s,$cb_read) + my ($s,$cb,$timeout)=@_; + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(undef); + }; + my $cb_read=sub { + My::Select::timeout_cancel($cb_tmo); + my $buf=''; + my $l=sysread($s,$buf,102400,0); + if (!defined $l) { + $cb->(undef); + } else { + $!=0; + $cb->($buf); + } + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::reader($s,$cb_read) } #-------------------------------------- @@ -501,7 +501,7 @@ package My::Cluster::Updown; # monitor nodes # -our %H; # ( name=> [state , last_seen , expect_seq , @data ] , ... ) +our %H; # ( name=> [state , last_seen , expect_seq , @data ] , ... ) our $MONITOR_STARTING=1; our %DUPLICATES; @@ -516,139 +516,138 @@ our %DUPLICATES; use Storable; sub save_state { - store \%H,'/root/clusterd.monitor.state'; + store \%H,'/root/clusterd.monitor.state'; } sub restore_state { - -e '/root/clusterd.monitor.state' and %H=%{retrieve '/root/clusterd.monitor.state'}; - for (values %H) { - $_->[1]=0; - } + -e '/root/clusterd.monitor.state' and %H=%{retrieve '/root/clusterd.monitor.state'}; + for (values %H) { + $_->[1]=0; + } } sub status { - my ($f)=@_; - my (@up,@down); - for (sort keys %H) { push @up, $_ if $H{$_}->[0] eq 'UP' } - for (sort keys %H) { push @down,$_ if $H{$_}->[0] eq 'DOWN' } - - $f->printf("UP (%d): %s\n\n",scalar(@up),join(' ',@up)); - $f->printf("DOWN (%d): %s\n\n",scalar(@down),join(' ',@down)); - - my (@k,$max); - - @k=sort({$H{$b}->[3] <=> $H{$a}->[3]} grep({$H{$_}->[0] eq 'UP'} (keys %H))); - $max=0; - $f->print("TOP 10: "); - for (@k) { - $f->print(sprintf "%s (%4.2f) ",$_,$H{$_}->[3]); - last if $max++>=10; - } - $f->print("\n\n"); - - @k=sort({($H{$b}->[5]||0) <=> ($H{$a}->[5]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H))); - $max=0; - $f->print("TOP 10 CPU load : "); - for (@k) { - $f->print(sprintf "%s (%3.1f%%) ",$_,($H{$_}->[5]||0)*100); - last if $max++>=10; - } - $f->print("\n\n"); - - @k=sort({($H{$b}->[6]||0) <=> ($H{$a}->[6]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H))); - $max=0; - my $total_bogomips=0; - $f->print("TOP 10 free capacity : "); - for (@k) { - my $bogo=$H{$_}->[6]||0; - $f->print(sprintf "%s (%3.1f) ",$_,$bogo) if $max++<10; - $total_bogomips+=$bogo - } - $f->printf("\ntotal available bogomips: %d\n\n",$total_bogomips); + my ($f)=@_; + my (@up,@down); + for (sort keys %H) { push @up, $_ if $H{$_}->[0] eq 'UP' } + for (sort keys %H) { push @down,$_ if $H{$_}->[0] eq 'DOWN' } + + $f->printf("UP (%d): %s\n\n",scalar(@up),join(' ',@up)); + $f->printf("DOWN (%d): %s\n\n",scalar(@down),join(' ',@down)); + + my (@k,$max); + + @k=sort({$H{$b}->[3] <=> $H{$a}->[3]} grep({$H{$_}->[0] eq 'UP'} (keys %H))); + $max=0; + $f->print("TOP 10: "); + for (@k) { + $f->print(sprintf "%s (%4.2f) ",$_,$H{$_}->[3]); + last if $max++>=10; + } + $f->print("\n\n"); + + @k=sort({($H{$b}->[5]||0) <=> ($H{$a}->[5]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H))); + $max=0; + $f->print("TOP 10 CPU load : "); + for (@k) { + $f->print(sprintf "%s (%3.1f%%) ",$_,($H{$_}->[5]||0)*100); + last if $max++>=10; + } + $f->print("\n\n"); + + @k=sort({($H{$b}->[6]||0) <=> ($H{$a}->[6]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H))); + $max=0; + my $total_bogomips=0; + $f->print("TOP 10 free capacity : "); + for (@k) { + my $bogo=$H{$_}->[6]||0; + $f->print(sprintf "%s (%3.1f) ",$_,$bogo) if $max++<10; + $total_bogomips+=$bogo + } + $f->printf("\ntotal available bogomips: %d\n\n",$total_bogomips); } sub msg_text { - my ($text)=@_; - warn($text."\n"); - my $time=scalar(localtime); - main::mgmt_print_all($time.': '.$text."\n"); + my ($text)=@_; + warn($text."\n"); + my $time=scalar(localtime); + main::mgmt_print_all($time.': '.$text."\n"); } sub msg_state { - my ($state_old,$state_new,$hostname,$extra)=@_; - msg_text(sprintf "%-4s -> %-4s %-20s %s",$state_old,$state_new,$hostname,$extra); + my ($state_old,$state_new,$hostname,$extra)=@_; + msg_text(sprintf "%-4s -> %-4s %-20s %s",$state_old,$state_new,$hostname,$extra); } sub init { - restore_state; - msg_text('node monitor: started. (recovery mode)'); - My::Select::timeout(630,sub{$MONITOR_STARTING=0;msg_text('node monitor: recovery finished');}); - My::Select::timeout(630,\&timeout_hosts); + restore_state; + msg_text('node monitor: started. (recovery mode)'); + My::Select::timeout(630,sub{$MONITOR_STARTING=0;msg_text('node monitor: recovery finished');}); + My::Select::timeout(630,\&timeout_hosts); } sub timeout_hosts { - My::Select::timeout_requeue(60); - my $timeout=My::Select::time()-1230; # 2x10 minutes + 30 seconds + My::Select::timeout_requeue(60); + my $timeout=My::Select::time()-1230; # 2x10 minutes + 30 seconds - for (keys %H) { - my $h=$H{$_}; - if ($h->[0] eq 'UP' && $h->[1]<=$timeout) { - msg_state('UP','DOWN',$_,"timeout!"); - $h->[0]='DOWN'; - } - } - save_state(); + for (keys %H) { + my $h=$H{$_}; + if ($h->[0] eq 'UP' && $h->[1]<=$timeout) { + msg_state('UP','DOWN',$_,"timeout!"); + $h->[0]='DOWN'; + } + } + save_state(); } sub rx_hostannounce { - # ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev) - - my ($host,$seq,@more)=@_; - my ($load_average,$version,$pload,$pcapacity,$unixrev)=@more; - $unixrev ||= '?'; - unless (exists $H{$host}) { - if ($seq==0) { - msg_state('NEW','UP',$host,"discovered new node $version - $unixrev"); - } else { - $MONITOR_STARTING or msg_state('NEW','UP',$host,"discovered running node $version - $unixrev"); - } - $H{$host}=['UP',My::Select::time,$seq+1,@more]; - } else { - my $h=$H{$host}; - if ($h->[0] eq 'UP') { - if ($seq == $h->[2]) { - ; - } else { - if ($seq==0) { - # msg_state('UP','UP',$host,"node rebootet $version - $unixrev"); - - } elsif ($seq>$h->[2]) { - $MONITOR_STARTING or msg_state('UP','UP',$host,$seq-$h->[2]." packet(s) lost!"); - } elsif ($seq+1==$h->[2]) { - warn "DUPLICATE from $host\n"; - $DUPLICATES{$host}=($DUPLICATES{$host}||0)+1; - } else { - msg_state('UP','UP',$host,"node rebooted and ".$seq." packet(s) lost!"); - } - } - } else { - if ($seq==0) { - msg_state('DOWN','UP',$host,"node rebooted $version - $unixrev"); - } elsif ($seq==$h->[2]) { - msg_state('DOWN','UP',$host,"sequence error. Node to slow? (seq=$seq,exp=".$h->[2].")"); - } else { - msg_state('DOWN','UP',$host,$seq-$h->[2]." packet(s) lost"); - } - } - @$h=('UP',My::Select::time,$seq+1,@more); - } + # ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev) + + my ($host,$seq,@more)=@_; + my ($load_average,$version,$pload,$pcapacity,$unixrev)=@more; + $unixrev ||= '?'; + unless (exists $H{$host}) { + if ($seq==0) { + msg_state('NEW','UP',$host,"discovered new node $version - $unixrev"); + } else { + $MONITOR_STARTING or msg_state('NEW','UP',$host,"discovered running node $version - $unixrev"); + } + $H{$host}=['UP',My::Select::time,$seq+1,@more]; + } else { + my $h=$H{$host}; + if ($h->[0] eq 'UP') { + if ($seq == $h->[2]) { + ; + } else { + if ($seq==0) { + # msg_state('UP','UP',$host,"node rebootet $version - $unixrev"); + } elsif ($seq>$h->[2]) { + $MONITOR_STARTING or msg_state('UP','UP',$host,$seq-$h->[2]." packet(s) lost!"); + } elsif ($seq+1==$h->[2]) { + warn "DUPLICATE from $host\n"; + $DUPLICATES{$host}=($DUPLICATES{$host}||0)+1; + } else { + msg_state('UP','UP',$host,"node rebooted and ".$seq." packet(s) lost!"); + } + } + } else { + if ($seq==0) { + msg_state('DOWN','UP',$host,"node rebooted $version - $unixrev"); + } elsif ($seq==$h->[2]) { + msg_state('DOWN','UP',$host,"sequence error. Node to slow? (seq=$seq,exp=".$h->[2].")"); + } else { + msg_state('DOWN','UP',$host,$seq-$h->[2]." packet(s) lost"); + } + } + @$h=('UP',My::Select::time,$seq+1,@more); + } } sub delete_host { - my ($host)=@_; - my $h=delete $H{$host} or return; - msg_text("host $host removed from monitor"); + my ($host)=@_; + my $h=delete $H{$host} or return; + msg_text("host $host removed from monitor"); } #----------------------------------------------------------------------- @@ -662,68 +661,68 @@ our $TCP_MAX=1024; our $DAY_LAST_MSG; sub day { - my @f=localtime; - return sprintf "%04d%02d%02d",$f[5]+1900,$f[4]+1,$f[3]; + my @f=localtime; + return sprintf "%04d%02d%02d",$f[5]+1900,$f[4]+1,$f[3]; } sub bigben { - my $day=day(); - $day le $DAY_LAST_MSG and return; - syslog('warning', "NETLOG ==================================================== morning has broken ====\n"); - $DAY_LAST_MSG=$day; + my $day=day(); + $day le $DAY_LAST_MSG and return; + syslog('warning', "NETLOG ==================================================== morning has broken ====\n"); + $DAY_LAST_MSG=$day; } sub bigben_timer { - bigben(); - My::Select::timeout_requeue(30); + bigben(); + My::Select::timeout_requeue(30); } sub bigben_init { - $DAY_LAST_MSG=day(); - My::Select::timeout(30,\&bigben_timer); + $DAY_LAST_MSG=day(); + My::Select::timeout(30,\&bigben_timer); } sub receive { - my ($socket,$peernode,$bufref)=@_; - my $data; + my ($socket,$peernode,$bufref)=@_; + my $data; - bigben(); + bigben(); - defined $socket->recv($data,$TCP_MAX) or return; - # length $data or warn "$peernode: disconnect\n"; - length $data or return; + defined $socket->recv($data,$TCP_MAX) or return; + # length $data or warn "$peernode: disconnect\n"; + length $data or return; - $$bufref.=$data; + $$bufref.=$data; - while (1) { - last if length($$bufref)<2; - my $l=unpack('n',$$bufref); - # warn "wait for $l+2 bytes got ".length($$bufref)."\n"; - last if length($$bufref)<2+$l; - my $msg=substr($$bufref,2,$l); - $$bufref=substr($$bufref,2+$l); - syslog('warning', "NETLOG $msg\n") unless $msg=~/NETLOG/; - } - My::Select::reader_requeue(); + while (1) { + last if length($$bufref)<2; + my $l=unpack('n',$$bufref); + # warn "wait for $l+2 bytes got ".length($$bufref)."\n"; + last if length($$bufref)<2+$l; + my $msg=substr($$bufref,2,$l); + $$bufref=substr($$bufref,2+$l); + syslog('warning', "NETLOG $msg\n") unless $msg=~/NETLOG/; + } + My::Select::reader_requeue(); } sub connect_request { - My::Select::reader_requeue(); + My::Select::reader_requeue(); - my $socket=$listen_socket->accept(); - $socket->blocking(0); + my $socket=$listen_socket->accept(); + $socket->blocking(0); - my $peernode=$socket->peerhost; - my $buffer=''; - My::Select::reader($socket,sub{receive($socket,$peernode,\$buffer)}); - # warn "$peernode: connect\n"; + my $peernode=$socket->peerhost; + my $buffer=''; + My::Select::reader($socket,sub{receive($socket,$peernode,\$buffer)}); + # warn "$peernode: connect\n"; } sub init { - $listen_socket=new IO::Socket::INET(Proto=>'tcp',LocalPort=>1028,Listen=>1,ReuseAddr=>1) or die "$!\n"; - My::Select::reader($listen_socket,\&connect_request); - bigben_init(); + $listen_socket=new IO::Socket::INET(Proto=>'tcp',LocalPort=>1028,Listen=>1,ReuseAddr=>1) or die "$!\n"; + My::Select::reader($listen_socket,\&connect_request); + bigben_init(); } #------------------------------------------------------------------------ @@ -734,33 +733,33 @@ use Sys::Syslog; use IO::Socket::INET; use Data::Dumper; -our $UDP_MAX=1472; # for broadcast on alphas +our $UDP_MAX=1472; # for broadcast on alphas our $UDP_PORT=234; -our $BC_RATE=8; # packets per second broadcast -our $TCP_TIMEOUT=30; # default timeout for tcp processing +our $BC_RATE = 8; # packets per second broadcast +our $TCP_TIMEOUT=30; # default timeout for tcp processing -our (%options); # RUN OPTIONS +our (%options); # RUN OPTIONS -our $donald_s; # My::Select::INET udp socket +our $donald_s; # My::Select::INET udp socket our $my_hostname; -our $my_ip; # '141.14.12.12' +our $my_ip; # '141.14.12.12' $my_hostname=lc `/bin/hostname`; chomp($my_hostname); $my_hostname =~ s/\.molgen\.mpg\.de$//; while (1) { - my $addr=inet_aton($my_hostname); - if(defined $addr) { - $my_ip=inet_ntoa(inet_aton($my_hostname)); - } - last if defined $my_ip; - my $once; - unless ($once) { - warn "no IP (yet)= - waiting\n"; - $once++; - } - sleep 30; + my $addr=inet_aton($my_hostname); + if(defined $addr) { + $my_ip=inet_ntoa(inet_aton($my_hostname)); + } + last if defined $my_ip; + my $once; + unless ($once) { + warn "no IP (yet)= - waiting\n"; + $once++; + } + sleep 30; } our $my_unixrev; @@ -771,200 +770,200 @@ our $CLUSTER_PW; our $CLUSTER_PW_FILE='/etc/clusterd.password'; our $CLUSTER_PW_TIMESTAMP=0; -$ENV{'PATH'} = '/usr/local/bin:/sbin:/usr/sbin:/bin'.($ENV{PATH}?':'.$ENV{PATH}:''); # for ps , tar (gnu!) +$ENV{'PATH'} = '/usr/local/bin:/sbin:/usr/sbin:/bin'.($ENV{PATH}?':'.$ENV{PATH}:''); # for ps , tar (gnu!) #---------------------------------------------------------- UDP -our ($udp_peer_addr,$udp_peer_port); # ('141.14.12.12',1234) +our ($udp_peer_addr,$udp_peer_port); # ('141.14.12.12',1234) -our %UDP_HANDLER= +our %UDP_HANDLER = ( - 'filedata' => \&udp_rx_filedata, - 'filedata.2' => \&udp_rx_filedata, - 'amdtardata' => \&udp_rx_amdtardata, - 'loadavg.2' => \&udp_rx_loadavg2, - 'restart' => \&udp_rx_restart, - 'flush-gidcache' => \&udp_rx_flush_gidcache, - 'make-automaps' => \&udp_rx_make_automaps, - 'reexport' => \&udp_rx_reexport, - 'log' => \&udp_rx_log, - 'exec' => \&udp_rx_exec, - 'push' => \&udp_rx_push, + 'filedata' => \&udp_rx_filedata, + 'filedata.2' => \&udp_rx_filedata, + 'amdtardata' => \&udp_rx_amdtardata, + 'loadavg.2' => \&udp_rx_loadavg2, + 'restart' => \&udp_rx_restart, + 'flush-gidcache' => \&udp_rx_flush_gidcache, + 'make-automaps' => \&udp_rx_make_automaps, + 'reexport' => \&udp_rx_reexport, + 'log' => \&udp_rx_log, + 'exec' => \&udp_rx_exec, + 'push' => \&udp_rx_push, ); sub udp_message { - my ($data,$x_udp_peer_addr,$x_udp_peer_port,$donald_s)=@_; + my ($data,$x_udp_peer_addr,$x_udp_peer_port,$donald_s)=@_; - ($udp_peer_addr,$udp_peer_port)=($x_udp_peer_addr,$x_udp_peer_port); + ($udp_peer_addr,$udp_peer_port)=($x_udp_peer_addr,$x_udp_peer_port); - defined $CLUSTER_PW or return; + defined $CLUSTER_PW or return; - my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return; - $UDP_HANDLER{$handler_name}->(@args) if exists $UDP_HANDLER{$handler_name}; + my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return; + $UDP_HANDLER{$handler_name}->(@args) if exists $UDP_HANDLER{$handler_name}; } -sub udp_send_message { # udp_send_message( dst, @args) # dst='141.14.31.255' 'zork' '141.14.16.1' etc. data is anything - my ($ip,@args)=@_; - defined $CLUSTER_PW or return; - $donald_s->send_data($ip,$UDP_PORT,sign($CLUSTER_PW,encode(@args))); +sub udp_send_message { # udp_send_message( dst, @args) # dst='141.14.31.255' 'zork' '141.14.16.1' etc. data is anything + my ($ip,@args)=@_; + defined $CLUSTER_PW or return; + $donald_s->send_data($ip,$UDP_PORT,sign($CLUSTER_PW,encode(@args))); } #---------------------------------------------------------- sub push_amd_tar { - my ($donald_s)=@_; - my $filename='/tmp/amd.tar'; - - my $pid=fork; - defined $pid or return warn "$!\n"; - unless($pid) { - chdir '/etc/amd' or die "/etc/amd: $!\n"; - exec 'tar','cf',$filename,'.'; - die "$!\n"; - } - wait; - $? and return; - - my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; - my $digest=Digest::MD5->new->addfile($fh)->digest; - warn "tar digest is ",Digest::MD5::md5_hex($digest),"\n"; - - $pid=fork; - defined $pid or return warn "$!\n"; - unless($pid) { - exec 'gzip','-f',$filename; - die "$!\n"; - } - wait; - $? and return; - - $filename='/tmp/amd.tar.gz'; - - my $st=Donald::FileInfo->lstat($filename); - defined $st or return warn "$filename: $!\n"; - $st->type eq 'F' or return warn "$filename: not a plain file\n"; - - $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; - - my $i=0; - for (my $pos=0;$pos<$st->size;$pos+=1024) { - my $data; - defined $fh->sysread($data,1024) or return warn "$filename: $!\n"; - # warn "send bytes $pos to ",$pos+length($data),"\n"; - ++$i % $BC_RATE or sleep 1; - udp_broadcast_message($donald_s,'amdtardata',$st,$pos,$data,$digest); - } + my ($donald_s)=@_; + my $filename='/tmp/amd.tar'; + + my $pid=fork; + defined $pid or return warn "$!\n"; + unless($pid) { + chdir '/etc/amd' or die "/etc/amd: $!\n"; + exec 'tar','cf',$filename,'.'; + die "$!\n"; + } + wait; + $? and return; + + my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; + my $digest=Digest::MD5->new->addfile($fh)->digest; + warn "tar digest is ",Digest::MD5::md5_hex($digest),"\n"; + + $pid=fork; + defined $pid or return warn "$!\n"; + unless($pid) { + exec 'gzip','-f',$filename; + die "$!\n"; + } + wait; + $? and return; + + $filename='/tmp/amd.tar.gz'; + + my $st=Donald::FileInfo->lstat($filename); + defined $st or return warn "$filename: $!\n"; + $st->type eq 'F' or return warn "$filename: not a plain file\n"; + + $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; + + my $i=0; + for (my $pos=0;$pos<$st->size;$pos+=1024) { + my $data; + defined $fh->sysread($data,1024) or return warn "$filename: $!\n"; + # warn "send bytes $pos to ",$pos+length($data),"\n"; + ++$i % $BC_RATE or sleep 1; + udp_broadcast_message($donald_s,'amdtardata',$st,$pos,$data,$digest); + } } sub push_file { - my ($donald_s,$filename)=@_; - - $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; - - my $st=Donald::FileInfo->lstat($filename); - defined $st or return warn "$filename: $!\n"; - my $rpc; - if ($st->type eq 'F') { - $rpc='filedata'; - $st->size<=80000 or die "$filename: to big for broadcast (max 80000 bytes)\n"; - if ($st->size==0) { - udp_broadcast_message($donald_s,$rpc,$st,0,''); - return; - } - - my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; - my $i=0; - for (my $pos=0;$pos<$st->size;$pos+=1024) { - my $data; - defined $fh->sysread($data,1024) or return warn "$filename: $!\n"; - # warn "send bytes $pos to ",$pos+length($data),"\n"; - udp_broadcast_message($donald_s,$rpc,$st,$pos,$data); - ++$i % $BC_RATE or sleep 1; - } - } elsif ($st->type eq 'L') { - $rpc='filedata.2'; - udp_broadcast_message($donald_s,$rpc,$st,0,''); - return; - } else { - die "file type not supported\n"; - } + my ($donald_s,$filename)=@_; + + $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; + + my $st=Donald::FileInfo->lstat($filename); + defined $st or return warn "$filename: $!\n"; + my $rpc; + if ($st->type eq 'F') { + $rpc='filedata'; + $st->size<=80000 or die "$filename: to big for broadcast (max 80000 bytes)\n"; + if ($st->size==0) { + udp_broadcast_message($donald_s,$rpc,$st,0,''); + return; + } + + my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; + my $i=0; + for (my $pos=0;$pos<$st->size;$pos+=1024) { + my $data; + defined $fh->sysread($data,1024) or return warn "$filename: $!\n"; + # warn "send bytes $pos to ",$pos+length($data),"\n"; + udp_broadcast_message($donald_s,$rpc,$st,$pos,$data); + ++$i % $BC_RATE or sleep 1; + } + } elsif ($st->type eq 'L') { + $rpc='filedata.2'; + udp_broadcast_message($donald_s,$rpc,$st,0,''); + return; + } else { + die "file type not supported\n"; + } } our %CMD=( - 'mkmotd'=>'/usr/sbin/mkmotd.pl', + 'mkmotd'=>'/usr/sbin/mkmotd.pl', ); sub send_exec { - my ($donald_s,$cmd)=@_; - unless (exists $CMD{$cmd}) { - die "available commands: ",join(' , ',keys %CMD),"\n"; - } - udp_broadcast_message($donald_s,'exec',$cmd); + my ($donald_s,$cmd)=@_; + unless (exists $CMD{$cmd}) { + die "available commands: ",join(' , ',keys %CMD),"\n"; + } + udp_broadcast_message($donald_s,'exec',$cmd); } sub udp_rx_exec { - my ($cmd)=@_; - - warn "exec $cmd\n"; - exists $CMD{$cmd} or return; - - my $pid; - $pid=fork; - unless (defined $pid) { - warn "$!\n"; - return; - } - unless ($pid) { - $pid=fork; - defined $pid or exit 1; - $pid and exit; - - open STDIN,'<','/dev/null'; - open STDOUT,'>','/dev/null'; - open STDERR,'>','/dev/null'; - alarm(60); - chdir '/'; - exec '/bin/sh','-c',$CMD{$cmd}; - exit 1; - } - wait; + my ($cmd)=@_; + + warn "exec $cmd\n"; + exists $CMD{$cmd} or return; + + my $pid; + $pid=fork; + unless (defined $pid) { + warn "$!\n"; + return; + } + unless ($pid) { + $pid=fork; + defined $pid or exit 1; + $pid and exit; + + open STDIN,'<','/dev/null'; + open STDOUT,'>','/dev/null'; + open STDERR,'>','/dev/null'; + alarm(60); + chdir '/'; + exec '/bin/sh','-c',$CMD{$cmd}; + exit 1; + } + wait; } #------------------------------------------------------------- -sub normalize_seg { # [pos,len],[pos,len],... - my @s=sort {$a->[0] <=> $b->[0]} @_; +sub normalize_seg { # [pos,len],[pos,len],... + my @s=sort {$a->[0] <=> $b->[0]} @_; - my $i=0; - while ($i<$#s) { - # is element $i joinable with next element + my $i=0; + while ($i<$#s) { + # is element $i joinable with next element - my $end_0=$s[$i]->[0]+$s[$i]->[1]; - if ($end_0 >= $s[$i+1]->[0] ) { - my $end_1=$s[$i+1]->[0]+$s[$i+1]->[1]; - $s[$i]->[1] = ($end_0>$end_1 ? $end_0 : $end_1)-$s[$i]->[0]; - splice @s,$i+1,1; - } else { - $i++; - } - } - return @s; + my $end_0=$s[$i]->[0]+$s[$i]->[1]; + if ($end_0 >= $s[$i+1]->[0] ) { + my $end_1=$s[$i+1]->[0]+$s[$i+1]->[1]; + $s[$i]->[1] = ($end_0>$end_1 ? $end_0 : $end_1)-$s[$i]->[0]; + splice @s,$i+1,1; + } else { + $i++; + } + } + return @s; } -my %RECEIVER; # ( filename => $receiver, .... ) +my %RECEIVER; # ( filename => $receiver, .... ) # $receiver : [ st_want , last_rx , io_handle , [ [pos,len] , [pos,len] , ... ] ] sub purge_old_receiver { - while (my ($n,$v)=each %RECEIVER) { - if ($v->[1]+10[0]->name,"\n"; - log_to_stat_target('timeout receiving ',$v->[0]->name); - delete $RECEIVER{$n}; - } - } - My::Select::timeout_requeue(60); + while (my ($n,$v)=each %RECEIVER) { + if ($v->[1]+10[0]->name,"\n"; + log_to_stat_target('timeout receiving ',$v->[0]->name); + delete $RECEIVER{$n}; + } + } + My::Select::timeout_requeue(60); } #------------------------------------------------------------- @@ -974,239 +973,239 @@ our $INSTALLED_DIGEST=''; our $rx_filedata_done; sub udp_rx_amdtardata { - my ($st_want,$pos,$data,$digest)=@_; + my ($st_want,$pos,$data,$digest)=@_; - ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo'; + ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo'; - ### $digest eq $INSTALLED_DIGEST and $pos==0 and warn "/etc/amd - ",Digest::MD5::md5_hex($digest)," already installed\n"; - $digest eq $INSTALLED_DIGEST and return; + ### $digest eq $INSTALLED_DIGEST and $pos==0 and warn "/etc/amd - ",Digest::MD5::md5_hex($digest)," already installed\n"; + $digest eq $INSTALLED_DIGEST and return; - #### $pos==0 and warn "receiving /etc/amd - ",Digest::MD5::md5_hex($digest),"\n"; + #### $pos==0 and warn "receiving /etc/amd - ",Digest::MD5::md5_hex($digest),"\n"; - udp_rx_filedata($st_want,$pos,$data); - if ($rx_filedata_done) { - my $pid=fork; - defined $pid or return warn "$!\n"; - unless($pid) { - chdir '/etc/amd' or die "/etc/amd: $!\n"; - exec 'tar','xzf',$st_want->name; - die "$!\n"; - } - } - wait; - $? and return; + udp_rx_filedata($st_want,$pos,$data); + if ($rx_filedata_done) { + my $pid=fork; + defined $pid or return warn "$!\n"; + unless($pid) { + chdir '/etc/amd' or die "/etc/amd: $!\n"; + exec 'tar','xzf',$st_want->name; + die "$!\n"; + } + } + wait; + $? and return; - warn "installed /etc/amd - ",Digest::MD5::md5_hex($digest),"\n"; - $INSTALLED_DIGEST=$digest; - system '/sbin/make-automaps'; + warn "installed /etc/amd - ",Digest::MD5::md5_hex($digest),"\n"; + $INSTALLED_DIGEST=$digest; + system '/sbin/make-automaps'; } our ($machine,$SYS_lchown,$SYS_mknod,$lmtime_sub); our ($SYS_utimensat,$AT_FDCWD,$UTIME_OMIT,$AT_SYMLINK_NOFOLLOW); sub lmtime_unsupported { - my ($path,$mtime)=@_; - warn "$path: don't known how to change symlink mtime on target architecture\n"; + my ($path,$mtime)=@_; + warn "$path: don't known how to change symlink mtime on target architecture\n"; } sub lmtime_utimensat { - my ($path,$mtime)=@_; - my $tsa=pack 'qqqq',0,$UTIME_OMIT,$mtime,0; - syscall($SYS_utimensat,$AT_FDCWD,$path,$tsa,$AT_SYMLINK_NOFOLLOW)==0 or return warn "$path: failed to lmtime: $!\n"; + my ($path,$mtime)=@_; + my $tsa=pack 'qqqq',0,$UTIME_OMIT,$mtime,0; + syscall($SYS_utimensat,$AT_FDCWD,$path,$tsa,$AT_SYMLINK_NOFOLLOW)==0 or return warn "$path: failed to lmtime: $!\n"; } $lmtime_sub=\&lmtime_unsupported; chomp($machine=`uname -m`); if ($machine eq 'i686') { - $SYS_lchown=198; # __NR_lchown32 in /usr/include/asm/unistd.h - $SYS_mknod=14; # __NR_mknod + $SYS_lchown=198; # __NR_lchown32 in /usr/include/asm/unistd.h + $SYS_mknod=14; # __NR_mknod } elsif ($machine eq 'x86_64') { - $SYS_lchown=94; # __NR_lchown in /usr/include/asm-x86_64/unistd.h - $SYS_mknod=133; # __NR_mknod - - $SYS_utimensat=280; # /usr/include/asm/unistd_64.h - $AT_FDCWD=-100; # /usr/include/fcntl.h - $UTIME_OMIT=(1<<30)-2; # /usr/include/bits/stat.h - $AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/fcntl.h - $lmtime_sub=\&lmtime_utimensat; + $SYS_lchown=94; # __NR_lchown in /usr/include/asm-x86_64/unistd.h + $SYS_mknod=133; # __NR_mknod + + $SYS_utimensat=280; # /usr/include/asm/unistd_64.h + $AT_FDCWD=-100; # /usr/include/fcntl.h + $UTIME_OMIT=(1<<30)-2; # /usr/include/bits/stat.h + $AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/fcntl.h + $lmtime_sub=\&lmtime_utimensat; } elsif ($machine eq 'alpha') { - $SYS_lchown=208; # SYS_lchown in /usr/include/syscall.h - $SYS_mknod=14; # SYS_mknod + $SYS_lchown=208; # SYS_lchown in /usr/include/syscall.h + $SYS_mknod=14; # SYS_mknod } elsif ($machine eq 'amd64') { - $SYS_lchown=254; # SYS_lchown in /usr/include/syscall.h - $SYS_mknod=14; # SYS_mknod + $SYS_lchown=254; # SYS_lchown in /usr/include/syscall.h + $SYS_mknod=14; # SYS_mknod } elsif ($machine eq 'ppc64le') { - $SYS_lchown=16; # __NR_lchown in /usr/include/powerpc64le-linux-gnu/asm/unistd.h - $SYS_mknod=14; # __NR_mknod - - $SYS_utimensat=304; # __NR_utimensat in /usr/include/powerpc64le-linux-gnu/asm/unistd.h - $AT_FDCWD=-100; # /usr/include/linux/fcntl.h - $UTIME_OMIT=(1<<30)-2; # /usr/include/powerpc64le-linux-gnu/bits/stat.h - $AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/linux/fcntl.h - $lmtime_sub=\&lmtime_utimensat; + $SYS_lchown=16; # __NR_lchown in /usr/include/powerpc64le-linux-gnu/asm/unistd.h + $SYS_mknod=14; # __NR_mknod + + $SYS_utimensat=304; # __NR_utimensat in /usr/include/powerpc64le-linux-gnu/asm/unistd.h + $AT_FDCWD=-100; # /usr/include/linux/fcntl.h + $UTIME_OMIT=(1<<30)-2; # /usr/include/powerpc64le-linux-gnu/bits/stat.h + $AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/linux/fcntl.h + $lmtime_sub=\&lmtime_utimensat; } else { - warn "unknown machine type $machine: symlink ownership can't be set.\n"; - warn "unknown machine type $machine: named pipes,character and block devices can't be created\n"; + warn "unknown machine type $machine: symlink ownership can't be set.\n"; + warn "unknown machine type $machine: named pipes,character and block devices can't be created\n"; } sub lchown { - my ($uid,$gid,$path)=@_; - $SYS_lchown or return; - syscall($SYS_lchown,$path,$uid+0,$gid+0)==0 or return warn "$path: failed to lchown: $!\n"; + my ($uid,$gid,$path)=@_; + $SYS_lchown or return; + syscall($SYS_lchown,$path,$uid+0,$gid+0)==0 or return warn "$path: failed to lchown: $!\n"; } sub lmtime { - my ($mtime,$path)=@_; - $lmtime_sub or return; - $lmtime_sub->($path,$mtime); + my ($mtime,$path)=@_; + $lmtime_sub or return; + $lmtime_sub->($path,$mtime); } sub udp_rx_filedata { # set rx_filedata_done as a side effect - my ($st_want,$pos,$data)=@_; - - ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo'; - - my $filename=$st_want->name; - my $tmp_filename="$filename.tmp"; - - $rx_filedata_done=0; - - my $st_is=Donald::FileInfo->lstat($st_want->name); - if ($st_is && $st_is->type eq 'F' && $st_is->size==$st_want->size && $st_is->mtime==$st_want->mtime) { - #### $pos==0 and warn " $filename seems to be current\n"; - return; - } - - if ($st_want->type eq 'L') { - if (!$st_is || $st_is->type ne 'L' || $st_is->target ne $st_want->target) { - $st_is and (unlink($filename) or return warn "$filename: failed to unlink: $!\n"); - symlink($st_want->target,$filename) or return warn "$filename: failed to create symlink: $!\n"; - lchown($st_want->uid,$st_want->gid,$filename); - lmtime($st_want->mtime,$filename); - warn "installed $filename -> ".$st_want->target."\n"; - } else { - if ($st_is->uid != $st_want->uid || $st_is->gid != $st_want->gid) { - lchown($st_want->uid,$st_want->gid,$filename); - } - if ($st_is->mtime != $st_want->mtime) { - lmtime($st_want->mtime,$filename); - } - } - return; - } - - if (length($data) == $st_want->size) { - # complete file in one broadcast - -e $tmp_filename and unlink($tmp_filename); - my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); - defined $fh or return warn "$tmp_filename: $!\n"; - if ($st_want->size) { - $fh->syswrite($data) or return warn "$tmp_filename: $!\n"; - $fh->sync(); - } - chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; - chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; - utime($st_want->mtime,$st_want->mtime,$tmp_filename); - rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; - warn "installed $filename\n"; - $rx_filedata_done=1; - return; - } - - length($data) or return; # shouldn't happen. - - my $receiver=$RECEIVER{$st_want->name}; - - if (defined $receiver) { - if ( $receiver->[0]->size != $st_want->size or $receiver->[0]->mtime != $st_want->mtime ) { - $receiver=undef; - } - } - - unless (defined $receiver) { - # create new receiver - ## warn "start receiving $filename from $udp_peer_addr\n"; - -e $tmp_filename and unlink($tmp_filename); - my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); - defined $fh or return warn "$tmp_filename: $!\n"; - $receiver = [$st_want,My::Select::time,$fh,[]]; - $RECEIVER{$filename}=$receiver; - } - - { - # warn "$filename: receive $pos length ",length($data),"\n"; - - # write data ( size cant be 0 here ) - $receiver->[2]->seek($pos,0) or return warn "$tmp_filename: $!\n"; - $receiver->[2]->print($data) or return warn "$tmp_filename: $!\n";; - $receiver->[1]=My::Select::time; - my $s=$receiver->[3]=[normalize_seg(@{$receiver->[3]},[$pos,length($data)])]; - - #warn "$filename: receive $pos length ",length($data)," segments now: ",join(",",map( {'['.$_->[0].','.$_->[1].']'} @{$receiver->[3]})),"\n"; - - # all there ? - - if (@$s == 1 && $s->[0]->[0]==0 && $s->[0]->[1]==$st_want->size) { - $receiver->[2]->flush(); - $receiver->[2]->sync(); - chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; - chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; - utime($st_want->mtime,$st_want->mtime,$tmp_filename); - rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; - warn "installed $filename\n"; - delete $RECEIVER{$filename}; - $rx_filedata_done=1; - } - } + my ($st_want,$pos,$data)=@_; + + ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo'; + + my $filename=$st_want->name; + my $tmp_filename="$filename.tmp"; + + $rx_filedata_done=0; + + my $st_is=Donald::FileInfo->lstat($st_want->name); + if ($st_is && $st_is->type eq 'F' && $st_is->size==$st_want->size && $st_is->mtime==$st_want->mtime) { + #### $pos==0 and warn " $filename seems to be current\n"; + return; + } + + if ($st_want->type eq 'L') { + if (!$st_is || $st_is->type ne 'L' || $st_is->target ne $st_want->target) { + $st_is and (unlink($filename) or return warn "$filename: failed to unlink: $!\n"); + symlink($st_want->target,$filename) or return warn "$filename: failed to create symlink: $!\n"; + lchown($st_want->uid,$st_want->gid,$filename); + lmtime($st_want->mtime,$filename); + warn "installed $filename -> ".$st_want->target."\n"; + } else { + if ($st_is->uid != $st_want->uid || $st_is->gid != $st_want->gid) { + lchown($st_want->uid,$st_want->gid,$filename); + } + if ($st_is->mtime != $st_want->mtime) { + lmtime($st_want->mtime,$filename); + } + } + return; +} + + if (length($data) == $st_want->size) { + # complete file in one broadcast + -e $tmp_filename and unlink($tmp_filename); + my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); + defined $fh or return warn "$tmp_filename: $!\n"; + if ($st_want->size) { + $fh->syswrite($data) or return warn "$tmp_filename: $!\n"; + $fh->sync(); + } + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$tmp_filename); + rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; + warn "installed $filename\n"; + $rx_filedata_done=1; + return; + } + + length($data) or return; # shouldn't happen. + + my $receiver=$RECEIVER{$st_want->name}; + + if (defined $receiver) { + if ( $receiver->[0]->size != $st_want->size or $receiver->[0]->mtime != $st_want->mtime ) { + $receiver=undef; + } + } + + unless (defined $receiver) { + # create new receiver + ## warn "start receiving $filename from $udp_peer_addr\n"; + -e $tmp_filename and unlink($tmp_filename); + my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); + defined $fh or return warn "$tmp_filename: $!\n"; + $receiver = [$st_want,My::Select::time,$fh,[]]; + $RECEIVER{$filename}=$receiver; + } + + { + # warn "$filename: receive $pos length ",length($data),"\n"; + + # write data ( size cant be 0 here ) + $receiver->[2]->seek($pos,0) or return warn "$tmp_filename: $!\n"; + $receiver->[2]->print($data) or return warn "$tmp_filename: $!\n";; + $receiver->[1]=My::Select::time; + my $s=$receiver->[3]=[normalize_seg(@{$receiver->[3]},[$pos,length($data)])]; + + #warn "$filename: receive $pos length ",length($data)," segments now: ",join(",",map( {'['.$_->[0].','.$_->[1].']'} @{$receiver->[3]})),"\n"; + + # all there ? + + if (@$s == 1 && $s->[0]->[0]==0 && $s->[0]->[1]==$st_want->size) { + $receiver->[2]->flush(); + $receiver->[2]->sync(); + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$tmp_filename); + rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; + warn "installed $filename\n"; + delete $RECEIVER{$filename}; + $rx_filedata_done=1; + } + } } #----------------------------------------------------------- sample running proc every 10 seconds -our @proc_running=(0)x12; # 12 samples every 5 seconds -> minute average -our @proc_running_10=(0)x10; # 10 samples every minute -> 10 minute average +our @proc_running=(0)x12; # 12 samples every 5 seconds -> minute average +our @proc_running_10=(0)x10; # 10 samples every minute -> 10 minute average our $SAMPLE_TICK=0; sub xget_cur_running_proc { - open L,'<','/proc/stat' or return 0; - while () { - # warn $_; - return $1 if /^procs_running\s*(\d+)/; - } - return 0; + open L,'<','/proc/stat' or return 0; + while () { + # warn $_; + return $1 if /^procs_running\s*(\d+)/; + } + return 0; } sub get_cur_running_proc { - our $CPUS; - my $r=xget_cur_running_proc(); - if ($CPUS && $r>$CPUS+1) { - pload_debug("more than $CPUS+1 proc"); - } - return $r; + our $CPUS; + my $r=xget_cur_running_proc(); + if ($CPUS && $r>$CPUS+1) { + pload_debug("more than $CPUS+1 proc"); + } + return $r; } sub running_proc { - my $ret=0; - $ret+=$_ for @proc_running; - return $ret/@proc_running; + my $ret=0; + $ret+=$_ for @proc_running; + return $ret/@proc_running; } sub running_proc_10 { - my $ret=0; - $ret+=$_ for @proc_running_10; - return $ret/@proc_running_10; + my $ret=0; + $ret+=$_ for @proc_running_10; + return $ret/@proc_running_10; } -sub sample_rproc { # every 5 seconds - @proc_running=(@proc_running[1..@proc_running-1],get_cur_running_proc()-1); - if ($SAMPLE_TICK<12) { - $SAMPLE_TICK++; - } else { - @proc_running_10=(@proc_running_10[1..@proc_running_10-1],running_proc()); - $SAMPLE_TICK=0; - } - My::Select::timeout_requeue(5); +sub sample_rproc { # every 5 seconds + @proc_running=(@proc_running[1..@proc_running-1],get_cur_running_proc()-1); + if ($SAMPLE_TICK<12) { + $SAMPLE_TICK++; + } else { + @proc_running_10=(@proc_running_10[1..@proc_running_10-1],running_proc()); + $SAMPLE_TICK=0; + } + My::Select::timeout_requeue(5); } #----------------------------------------------------------- stat @@ -1214,93 +1213,93 @@ sub sample_rproc { # every 5 seconds our ($CPUS,$BOGOMIPS); sub init_cpuinfo { - Donald::Tools::is_alpha and return; - open L,'<','/proc/cpuinfo' or return; - while () { - if (/^bogomips\s*:\s*([\d.]+)/) {$CPUS++;$BOGOMIPS+=$1;} - } -} - -sub loadavg { # AXP : (system load average) , LINUX: (system load average, pload, freebogo) - if (Donald::Tools::is_alpha) { - my $data=pack 'l!3lx4l!3'; - my $i=syscall(85,3,0,$data,1,length($data)); # table(id=TBL_LOADAVG,index=0,addr=data,nel=1,lel=56) - my ($l0,$l1,$l2,$scale,$mach0,$mach1,$mach2)=unpack 'l!3lx4l!3',$data; - $scale or return undef; - return $l0/$scale; - } else { - $CPUS or init_cpuinfo(); - my $running_proc=running_proc(); - open L,'<','/proc/loadavg' or return undef; - my $data; - sysread(L,$data,1024); - close L; - $data =~ /^(\d+\.?\d*)/ or return undef; # 5 min loadavg - if ($CPUS) { - return ($1+0,($running_proc)/$CPUS,($running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS)); - } else { - return $1+0; - } - } + Donald::Tools::is_alpha and return; + open L,'<','/proc/cpuinfo' or return; + while () { + if (/^bogomips\s*:\s*([\d.]+)/) {$CPUS++;$BOGOMIPS+=$1;} + } +} + +sub loadavg { # AXP : (system load average) , LINUX: (system load average, pload, freebogo) + if (Donald::Tools::is_alpha) { + my $data=pack 'l!3lx4l!3'; + my $i=syscall(85,3,0,$data,1,length($data)); # table(id=TBL_LOADAVG,index=0,addr=data,nel=1,lel=56) + my ($l0,$l1,$l2,$scale,$mach0,$mach1,$mach2)=unpack 'l!3lx4l!3',$data; + $scale or return undef; + return $l0/$scale; + } else { + $CPUS or init_cpuinfo(); + my $running_proc=running_proc(); + open L,'<','/proc/loadavg' or return undef; + my $data; + sysread(L,$data,1024); + close L; + $data =~ /^(\d+\.?\d*)/ or return undef; # 5 min loadavg + if ($CPUS) { + return ($1+0,($running_proc)/$CPUS,($running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS)); + } else { + return $1+0; + } + } } our $STAT_TARGET='afk'; our $STAT_SEQ=0; sub send_stat { - My::Select::timeout_requeue(600); - my ($load_avg,$pload,$pcapacity)=loadavg(); - defined $load_avg or return; - udp_send_message($STAT_TARGET,'loadavg.2',$my_hostname,$STAT_SEQ++,$load_avg,version_info(),$pload,$pcapacity,$my_unixrev); + My::Select::timeout_requeue(600); + my ($load_avg,$pload,$pcapacity)=loadavg(); + defined $load_avg or return; + udp_send_message($STAT_TARGET,'loadavg.2',$my_hostname,$STAT_SEQ++,$load_avg,version_info(),$pload,$pcapacity,$my_unixrev); } sub udp_rx_loadavg2 { - my ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev)=@_; - $my_hostname eq $STAT_TARGET and My::Cluster::Updown::rx_hostannounce($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev) + my ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev)=@_; + $my_hostname eq $STAT_TARGET and My::Cluster::Updown::rx_hostannounce($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev) } sub udp_rx_log { - my ($msg)=@_; - My::Cluster::Updown::msg_text($msg); + my ($msg)=@_; + My::Cluster::Updown::msg_text($msg); } sub log_to_stat_target { - my ($msg)=join '',@_; - udp_send_message($STAT_TARGET,'log',"$my_hostname: $msg"); + my ($msg)=join '',@_; + udp_send_message($STAT_TARGET,'log',"$my_hostname: $msg"); } # ---------------------------------------------------------- sub udp_rx_restart { - # double-fork, because kill_previous_server() won't kill its parent - my $pid=fork; - if (defined $pid && $pid==0) { - my $pid2=fork; - if (defined $pid2 && $pid==0) { - exec '/usr/sbin/clusterd','--kill','--daemon'; - die "exec failed: $!\n"; - } - } + # double-fork, because kill_previous_server() won't kill its parent + my $pid=fork; + if (defined $pid && $pid==0) { + my $pid2=fork; + if (defined $pid2 && $pid==0) { + exec '/usr/sbin/clusterd','--kill','--daemon'; + die "exec failed: $!\n"; + } + } } sub udp_rx_flush_gidcache { - if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') { - print $out time(); - } else { - warn "proc/net/rpc/auth.unix.gid/flush: $!\n"; - } + if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') { + print $out time(); + } else { + warn "proc/net/rpc/auth.unix.gid/flush: $!\n"; + } } sub udp_rx_make_automaps { - if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') { - print $out time(); - } else { - warn "proc/net/rpc/auth.unix.gid/flush: $!\n"; - } - system '/sbin/make-automaps'; + if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') { + print $out time(); + } else { + warn "proc/net/rpc/auth.unix.gid/flush: $!\n"; + } + system '/sbin/make-automaps'; } sub udp_rx_reexport { - system '/usr/bin/mxmount --reexport-only'; + system '/usr/bin/mxmount --reexport-only'; } #----------- tcp mgmt console ----------------------------- @@ -1311,79 +1310,79 @@ our $mgmt_listen_socket; our %mgmt_sockets; sub mgmt_init { - $mgmt_listen_socket=new IO::Socket::INET(LocalPort=>$MGMT_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1); - defined $mgmt_listen_socket or die "$!\n"; - My::Select::reader($mgmt_listen_socket,\&mgmt_connect_request); + $mgmt_listen_socket=new IO::Socket::INET(LocalPort=>$MGMT_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1); + defined $mgmt_listen_socket or die "$!\n"; + My::Select::reader($mgmt_listen_socket,\&mgmt_connect_request); } sub mgmt_connect_request { - My::Select::reader_requeue(); + My::Select::reader_requeue(); - # listen socket ready + # listen socket ready - my $socket=$mgmt_listen_socket->accept(); - $socket->blocking(0); - my $peernode=$socket->peerhost; + my $socket=$mgmt_listen_socket->accept(); + $socket->blocking(0); + my $peernode=$socket->peerhost; - ### warn "accepted mgmt connection from $peernode\n"; + ### warn "accepted mgmt connection from $peernode\n"; - My::Select::reader($socket,sub{mgmt_receive($socket)}); - $mgmt_sockets{$socket}=$socket; - $socket->print("clusterd ".version_info()." stupid console\n"); - $socket->print("For historical messages, grep \"clusterd\" from /var/log/messages on $STAT_TARGET (or \"tail -f /var/log/messages |grep cluster\")\n"); + My::Select::reader($socket,sub{mgmt_receive($socket)}); + $mgmt_sockets{$socket}=$socket; + $socket->print("clusterd ".version_info()." stupid console\n"); + $socket->print("For historical messages, grep \"clusterd\" from /var/log/messages on $STAT_TARGET (or \"tail -f /var/log/messages |grep cluster\")\n"); } sub mgmt_receive { - my ($s)=@_; - my $data; - my $len=$s->sysread($data,1024); - if (!defined $len or $len==0 or $len==1 && $data eq "\cD") { - delete $mgmt_sockets{$s}; - $s->close; - ###warn "lost connection to mgmt port\n"; - return; - } - - My::Select::reader_requeue(); - $data=~s/\r?\n$//; - length $data or return; - if ($data eq 'l') { - $my_hostname eq $STAT_TARGET and My::Cluster::Updown::status($s); - $s->print("AREA: ",area_config_as_string(),"\n"); - $s->print("STAT TARGET: ",$STAT_TARGET,"\n"); - $s->print(' (',scalar(localtime),')',"\n"); - } - elsif ($data eq 'r') { - for my $host (sort keys %H) { - if ($H{$host}->[0] eq 'UP') { - $s->printf("%-40s : %s\n",$host,$H{$host}->[7]); - } - } - } elsif ($data eq 'v') { - $Data::Dumper::Terse=1; - $Data::Dumper::Indent=0; - for (sort keys %H) { - $s->printf("%15s : %s\n",$_,Dumper($H{$_})); - } - } elsif ($data eq 'd') { - my $running_proc=running_proc(); - my $running_proc_10=running_proc_10(); - $s->printf("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc); - $s->printf("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10); - $CPUS or init_cpuinfo; - $CPUS and $s->printf("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS); - $CPUS and $s->printf("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS); - } elsif ($data =~ /^delete (\S+)$/) { - My::Cluster::Updown::delete_host($1); - } elsif ($data eq 'dup show') { - for my $h (sort {$DUPLICATES{$b} <=> $DUPLICATES{$a} } keys %DUPLICATES) { - $s->printf("%-10s : %5d\n",$h,$DUPLICATES{$h}); - } - } elsif ($data eq 'dup clear') { - %DUPLICATES=(); - } else { - $s->print(<<"_EOF_"); + my ($s)=@_; + my $data; + my $len=$s->sysread($data,1024); + if (!defined $len or $len==0 or $len==1 && $data eq "\cD") { + delete $mgmt_sockets{$s}; + $s->close; + ###warn "lost connection to mgmt port\n"; + return; + } + + My::Select::reader_requeue(); + $data=~s/\r?\n$//; + length $data or return; + if ($data eq 'l') { + $my_hostname eq $STAT_TARGET and My::Cluster::Updown::status($s); + $s->print("AREA: ",area_config_as_string(),"\n"); + $s->print("STAT TARGET: ",$STAT_TARGET,"\n"); + $s->print(' (',scalar(localtime),')',"\n"); + } + elsif ($data eq 'r') { + for my $host (sort keys %H) { + if ($H{$host}->[0] eq 'UP') { + $s->printf("%-40s : %s\n",$host,$H{$host}->[7]); + } + } + } elsif ($data eq 'v') { + $Data::Dumper::Terse=1; + $Data::Dumper::Indent=0; + for (sort keys %H) { + $s->printf("%15s : %s\n",$_,Dumper($H{$_})); + } + } elsif ($data eq 'd') { + my $running_proc=running_proc(); + my $running_proc_10=running_proc_10(); + $s->printf("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc); + $s->printf("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10); + $CPUS or init_cpuinfo; + $CPUS and $s->printf("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS); + $CPUS and $s->printf("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS); + } elsif ($data =~ /^delete (\S+)$/) { + My::Cluster::Updown::delete_host($1); + } elsif ($data eq 'dup show') { + for my $h (sort {$DUPLICATES{$b} <=> $DUPLICATES{$a} } keys %DUPLICATES) { + $s->printf("%-10s : %5d\n",$h,$DUPLICATES{$h}); + } + } elsif ($data eq 'dup clear') { + %DUPLICATES=(); + } else { + $s->print(<<"_EOF_"); unknown command: $data l : list status @@ -1392,135 +1391,135 @@ d : debug (cpu speed calc) r : dump unix revisions delete HOST : forget about HOST -dup show : show duplicates stat -dup clear : clear duplicates stat +dup show : show duplicates stat +dup clear : clear duplicates stat to exit use ^D _EOF_ - } + } } sub mgmt_print_all { - my ($msg)=@_; + my ($msg)=@_; - $options{'foreground'} and not $options{'syslog'} and print $msg; + $options{'foreground'} and not $options{'syslog'} and print $msg; - for my $s (values(%mgmt_sockets)) { - $s->print($msg); - } + for my $s (values(%mgmt_sockets)) { + $s->print($msg); + } } #----------------------------------------------------------- sub clp_rx_LSOF { - my ($socket,$pattern)=@_; - - my $pid=fork; - unless (defined $pid) { - warn"$!\n"; - return; - } - unless ($pid) { - my $pid=fork; - defined $pid or die "$!\n"; - unless ($pid) { - $socket->blocking(1); - # -n inhibits the conversion of network numbers to host names for network files. - # -b causes lsof to avoid kernel functions that might block - lstat(2), readlink(2), and stat(2). - # -w disables warning messages. - open P,'timeout -k 92s 90s lsof -n -b -w|' or die "$!\n"; - while (

) { - next if defined $pattern && index($_,$pattern)<0; - $socket->send(pack('n',length($_)).$_,0); - } - close P; - if ($?) { - $_=sprintf("** lsof timout/error on %s\n",$my_hostname); - $socket->send(pack('n',length($_)).$_,0); - } - close $socket; - exit; - } - exit; - } - close $socket; - wait; - return 1; + my ($socket,$pattern)=@_; + + my $pid=fork; + unless (defined $pid) { + warn"$!\n"; + return; + } + unless ($pid) { + my $pid=fork; + defined $pid or die "$!\n"; + unless ($pid) { + $socket->blocking(1); + # -n inhibits the conversion of network numbers to host names for network files. + # -b causes lsof to avoid kernel functions that might block - lstat(2), readlink(2), and stat(2). + # -w disables warning messages. + open P,'timeout -k 92s 90s lsof -n -b -w|' or die "$!\n"; + while (

) { + next if defined $pattern && index($_,$pattern)<0; + $socket->send(pack('n',length($_)).$_,0); + } + close P; + if ($?) { + $_=sprintf("** lsof timout/error on %s\n",$my_hostname); + $socket->send(pack('n',length($_)).$_,0); + } + close $socket; + exit; + } + exit; + } + close $socket; + wait; + return 1; } sub run_cmd { - my ($socket,@cmd)=@_; - my $pid=fork; - unless (defined $pid) { - warn"$!\n"; - return; - } - unless ($pid) { - my $opipe=new IO::Pipe; - my $epipe=new IO::Pipe; - my $cpid=fork; - defined $cpid or die "$!\n"; - unless ($cpid) { - warn "exec ".join(' ',@cmd)."\n"; - $opipe->writer(); - $epipe->writer(); - open STDIN,'<','/dev/null'; - open STDOUT,'>&',$opipe; - open STDERR,'>&',$epipe; - exec @cmd; - die "$!\n"; - } -# $::SIG{'CHLD'}=sub { -# my $pid=wait; -# my $buffer="X$?"; -# $socket->send(pack('n',length($buffer)).$buffer,0); -# exit; -# }; - $opipe->reader(); - $epipe->reader(); - my $ofn=$opipe->fileno; - my $efn=$epipe->fileno; - my ($rvec_in,$wvec_in,$evec_in)=('','',''); - vec($rvec_in,$ofn,1)=1; - vec($rvec_in,$efn,1)=1; - my $channel=2; - my $buffer; - $socket->blocking(1); - while (1) { - my ($rvec,$wvec,$evec)=($rvec_in,$wvec_in,$evec_in); - my $ready=select($rvec,$wvec,$evec,60); - $ready or die "timeout\n"; - if (vec($rvec,$ofn,1)) { - my $len=$opipe->sysread($buffer,1024-1-2); - defined $len or die "$!\n"; - if ($len) { - $socket->send(pack('n',$len+1)."O$buffer",0); - } else { - vec($rvec_in,$ofn,1)=0; - $opipe->close; - $channel--; - } - } - if (vec($rvec,$efn,1)) { - my $len=$epipe->sysread($buffer,1024-1-2); - defined $len or die "$!\n"; - if ($len) { - $socket->send(pack('n',$len+1)."E$buffer",0); - } else { - vec($rvec_in,$efn,1)=0; - $epipe->close; - $channel--; - } - } - if ($channel==0) { - my $pid=wait; - my $buffer="X$?"; - $socket->send(pack('n',length($buffer)).$buffer,0); - exit; - } - } - } + my ($socket,@cmd)=@_; + my $pid=fork; + unless (defined $pid) { + warn"$!\n"; + return; + } + unless ($pid) { + my $opipe=new IO::Pipe; + my $epipe=new IO::Pipe; + my $cpid=fork; + defined $cpid or die "$!\n"; + unless ($cpid) { + warn "exec ".join(' ',@cmd)."\n"; + $opipe->writer(); + $epipe->writer(); + open STDIN,'<','/dev/null'; + open STDOUT,'>&',$opipe; + open STDERR,'>&',$epipe; + exec @cmd; + die "$!\n"; + } +# $::SIG{'CHLD'}=sub { +# my $pid=wait; +# my $buffer="X$?"; +# $socket->send(pack('n',length($buffer)).$buffer,0); +# exit; +# }; + $opipe->reader(); + $epipe->reader(); + my $ofn=$opipe->fileno; + my $efn=$epipe->fileno; + my ($rvec_in,$wvec_in,$evec_in)=('','',''); + vec($rvec_in,$ofn,1)=1; + vec($rvec_in,$efn,1)=1; + my $channel=2; + my $buffer; + $socket->blocking(1); + while (1) { + my ($rvec,$wvec,$evec)=($rvec_in,$wvec_in,$evec_in); + my $ready=select($rvec,$wvec,$evec,60); + $ready or die "timeout\n"; + if (vec($rvec,$ofn,1)) { + my $len=$opipe->sysread($buffer,1024-1-2); + defined $len or die "$!\n"; + if ($len) { + $socket->send(pack('n',$len+1)."O$buffer",0); + } else { + vec($rvec_in,$ofn,1)=0; + $opipe->close; + $channel--; + } + } + if (vec($rvec,$efn,1)) { + my $len=$epipe->sysread($buffer,1024-1-2); + defined $len or die "$!\n"; + if ($len) { + $socket->send(pack('n',$len+1)."E$buffer",0); + } else { + vec($rvec_in,$efn,1)=0; + $epipe->close; + $channel--; + } + } + if ($channel==0) { + my $pid=wait; + my $buffer="X$?"; + $socket->send(pack('n',length($buffer)).$buffer,0); + exit; + } + } + } } #----------- CLP cluster protocol ----------------------------- @@ -1531,70 +1530,70 @@ our $clp_listen_socket; our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF,'PULL'=>\&clp_rx_PULL); sub clp_init { - $clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>128,ReuseAddr=>1); - defined $clp_listen_socket or die "$!\n"; - My::Select::reader($clp_listen_socket,\&clp_connect_request); + $clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>128,ReuseAddr=>1); + defined $clp_listen_socket or die "$!\n"; + My::Select::reader($clp_listen_socket,\&clp_connect_request); } sub clp_connect_request { - My::Select::reader_requeue(); + My::Select::reader_requeue(); - # listen socket ready + # listen socket ready - my $socket=$clp_listen_socket->accept(); - $socket->blocking(0); - my $peernode=$socket->peerhost; + my $socket=$clp_listen_socket->accept(); + $socket->blocking(0); + my $peernode=$socket->peerhost; - my $buffer=''; + my $buffer=''; - My::Select::reader($socket,sub{clp_receive($socket,\$buffer)}); + My::Select::reader($socket,sub{clp_receive($socket,\$buffer)}); } sub clp_receive { - my ($s,$bufref)=@_; - my $data; - defined $s->recv($data,$TCP_MAX) or return; - if (!length($data) ) { - $s->close; - return; - } - $$bufref.=$data; - while (1) { - last if length($$bufref)<2; - my $l=unpack('n',$$bufref); - last if length($$bufref)<2+$l; - my $msg=substr($$bufref,2,$l); - $$bufref=substr($$bufref,2+$l); - clp_message($s,$msg) and return; - } - My::Select::reader_requeue(); + my ($s,$bufref)=@_; + my $data; + defined $s->recv($data,$TCP_MAX) or return; + if (!length($data) ) { + $s->close; + return; + } + $$bufref.=$data; + while (1) { + last if length($$bufref)<2; + my $l=unpack('n',$$bufref); + last if length($$bufref)<2+$l; + my $msg=substr($$bufref,2,$l); + $$bufref=substr($$bufref,2+$l); + clp_message($s,$msg) and return; + } + My::Select::reader_requeue(); } sub clp_message { - my ($socket,$data)=@_; + my ($socket,$data)=@_; - defined $CLUSTER_PW or return; + defined $CLUSTER_PW or return; - my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return; - $CLP_HANDLER{$handler_name}->($socket,@args) if exists $CLP_HANDLER{$handler_name}; + my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return; + $CLP_HANDLER{$handler_name}->($socket,@args) if exists $CLP_HANDLER{$handler_name}; } -sub clp_send_message { # clp_send_message($socket, @args) - my ($s,@args)=@_; - defined $CLUSTER_PW or return; - my $data=sign($CLUSTER_PW,encode(@args)); - unless ($s->peername) { - return 0; - } - $s->send(pack('n',length($data)).$data); +sub clp_send_message { # clp_send_message($socket, @args) + my ($s,@args)=@_; + defined $CLUSTER_PW or return; + my $data=sign($CLUSTER_PW,encode(@args)); + unless ($s->peername) { + return 0; + } + $s->send(pack('n',length($data)).$data); } sub clp_rx_CMD { - my ($socket,@args)=@_; - run_cmd($socket,@args); - close $socket; - return 1; + my ($socket,@args)=@_; + run_cmd($socket,@args); + close $socket; + return 1; } # send_tcp_cp($socket,$cb,$timeout,@args) @@ -1604,127 +1603,127 @@ sub clp_rx_CMD { # assume $CLUSTER_PW is valid # sub send_tcp_cp { - my ($s,$cb,$timeout,@args)=@_; - my $data=sign($CLUSTER_PW,encode(@args)); - My::Select::INET::send_tcp($s,pack('n',length($data)).$data,$timeout,$cb); + my ($s,$cb,$timeout,@args)=@_; + my $data=sign($CLUSTER_PW,encode(@args)); + My::Select::INET::send_tcp($s,pack('n',length($data)).$data,$timeout,$cb); } #---------------------------------------------------------- sub sync_cluster_pw { - my $st=Donald::FileInfo->lstat($CLUSTER_PW_FILE); - - if ($st) { - if (!defined $CLUSTER_PW or $CLUSTER_PW_TIMESTAMP != $st->mtime) { - my $fh=new IO::File $CLUSTER_PW_FILE,'<'; - if (defined ($fh)) { - defined $CLUSTER_PW and warn "update cluster password\n"; - $fh->read($CLUSTER_PW,1024); - $CLUSTER_PW_TIMESTAMP=$st->mtime; - } else { - defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n"; - $CLUSTER_PW=undef; - } - } - } else { - defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n"; - $CLUSTER_PW=undef; - } - My::Select::timeout(60,\&sync_cluster_pw); - return defined $CLUSTER_PW; + my $st=Donald::FileInfo->lstat($CLUSTER_PW_FILE); + + if ($st) { + if (!defined $CLUSTER_PW or $CLUSTER_PW_TIMESTAMP != $st->mtime) { + my $fh=new IO::File $CLUSTER_PW_FILE,'<'; + if (defined ($fh)) { + defined $CLUSTER_PW and warn "update cluster password\n"; + $fh->read($CLUSTER_PW,1024); + $CLUSTER_PW_TIMESTAMP=$st->mtime; + } else { + defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n"; + $CLUSTER_PW=undef; + } + } + } else { + defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n"; + $CLUSTER_PW=undef; + } + My::Select::timeout(60,\&sync_cluster_pw); + return defined $CLUSTER_PW; } #------------------------------------------------------------ # area routing -our %AREA_ROUTER= +our %AREA_ROUTER = ( - wtf => '141.14.31.255', + wtf => '141.14.31.255', ); our $area_socket; sub init_area { - exists $AREA_ROUTER{$my_hostname} or return; - warn "I am area router for $AREA_ROUTER{$my_hostname}\n"; + exists $AREA_ROUTER{$my_hostname} or return; + warn "I am area router for $AREA_ROUTER{$my_hostname}\n"; - $area_socket=new IO::Socket::INET (Proto=>'udp',LocalPort=>$UDP_PORT+1) or die "$!\n"; - My::Select::reader($area_socket,sub{area_message($area_socket)}); + $area_socket=new IO::Socket::INET (Proto=>'udp',LocalPort=>$UDP_PORT+1) or die "$!\n"; + My::Select::reader($area_socket,sub{area_message($area_socket)}); } sub area_message { - my ($area_socket)=@_; - my $data; - my $peer = $area_socket->recv($data,$UDP_MAX); - my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); - my $udp_peer_addr=inet_ntoa($peer_iaddr); + my ($area_socket)=@_; + my $data; + my $peer = $area_socket->recv($data,$UDP_MAX); + my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); + my $udp_peer_addr=inet_ntoa($peer_iaddr); - $donald_s->send_data($AREA_ROUTER{$my_hostname},$UDP_PORT,$data); # broadcast to our network - My::Select::reader_requeue(); + $donald_s->send_data($AREA_ROUTER{$my_hostname},$UDP_PORT,$data); # broadcast to our network + My::Select::reader_requeue(); } sub udp_broadcast_message { - my ($donald_s,@args)=@_; + my ($donald_s,@args)=@_; - defined $CLUSTER_PW or return; - my $data=sign($CLUSTER_PW,encode(@args)); - for my $ip (keys %AREA_ROUTER) { - $donald_s->send_data($ip,$UDP_PORT+1,$data); - } + defined $CLUSTER_PW or return; + my $data=sign($CLUSTER_PW,encode(@args)); + for my $ip (keys %AREA_ROUTER) { + $donald_s->send_data($ip,$UDP_PORT+1,$data); + } } sub area_config_as_string { - return join(',',map({"$_ => $AREA_ROUTER{$_}"} keys %AREA_ROUTER)) + return join(',',map({"$_ => $AREA_ROUTER{$_}"} keys %AREA_ROUTER)) } #------------------------------------------------------------ -our $PROG_FILE; # saved $0 - may be relative path +our $PROG_FILE; # saved $0 - may be relative path our $PROG_MTIME; sub check_progfile_status { - defined $PROG_FILE or $PROG_FILE=$0; - my @f=lstat $PROG_FILE or return; - $PROG_MTIME=$f[9]; + defined $PROG_FILE or $PROG_FILE=$0; + my @f=lstat $PROG_FILE or return; + $PROG_MTIME=$f[9]; } -sub version_info { # '20090617-155314' - my $t; - if (defined $PROG_MTIME) { - my @f=localtime($PROG_MTIME); - return sprintf '%4d%02d%02d-%02d%02d%02d',$f[5]+1900,$f[4]+1,$f[3],$f[2],$f[1],$f[0]; - } else { - return '?'; - } +sub version_info { # '20090617-155314' + my $t; + if (defined $PROG_MTIME) { + my @f=localtime($PROG_MTIME); + return sprintf '%4d%02d%02d-%02d%02d%02d',$f[5]+1900,$f[4]+1,$f[3],$f[2],$f[1],$f[0]; + } else { + return '?'; + } } #------------------------------------------------------------ sub pload_debug { - my ($why)=@_; - open T,'>','/var/log/pload_debug.txt' or return; - print T "$why at ".scalar(localtime),"\n"; - my $running_proc=running_proc(); - my $running_proc_10=running_proc_10(); - printf T ("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc); - printf T ("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10); - $CPUS or init_cpuinfo; - $CPUS and printf T ("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS); - $CPUS and printf T ("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS); - system "uptime>>/var/log/pload_debug.txt"; - system "ps -AlfT>>/var/log/pload_debug.txt"; - system "cat /proc/stat>>/var/log/pload_debug.txt"; - system "cat /proc/swaps>>/var/log/pload_debug.txt"; - close T; + my ($why)=@_; + open T,'>','/var/log/pload_debug.txt' or return; + print T "$why at ".scalar(localtime),"\n"; + my $running_proc=running_proc(); + my $running_proc_10=running_proc_10(); + printf T ("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc); + printf T ("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10); + $CPUS or init_cpuinfo; + $CPUS and printf T ("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS); + $CPUS and printf T ("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS); + system "uptime>>/var/log/pload_debug.txt"; + system "ps -AlfT>>/var/log/pload_debug.txt"; + system "cat /proc/stat>>/var/log/pload_debug.txt"; + system "cat /proc/swaps>>/var/log/pload_debug.txt"; + close T; } sub check_overload() { - My::Select::timeout_requeue(600); - if ($CPUS) { - my $running_proc_10=running_proc_10(); - my $pload_10=running_proc_10()/$CPUS; - $pload_10>1.5 and warn (sprintf("pload>150%% (%3.1f%%)\n",$pload_10*100)); - } + My::Select::timeout_requeue(600); + if ($CPUS) { + my $running_proc_10=running_proc_10(); + my $pload_10=running_proc_10()/$CPUS; + $pload_10>1.5 and warn (sprintf("pload>150%% (%3.1f%%)\n",$pload_10*100)); + } } ##################################################### @@ -1733,253 +1732,253 @@ my $slave=0; my $exit_value=0; sub expand_hostconfig_hosts { - my ($filter)=@_; - open my $p,'-|','hostconfig','--list',$filter or die "hostconfig: $!\”n"; - return split ' ',readline($p); + my ($filter)=@_; + open my $p,'-|','hostconfig','--list',$filter or die "hostconfig: $!\”n"; + return split ' ',readline($p); } sub exec_at { - my ($host,@cmd)=@_; - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - my $s=new IO::Socket::INET(PeerAddr=>$host,PeerPort=>$CLP_PORT); - unless (defined $s) { - die "$host: $!\n"; - } - clp_send_message($s,'CMD',@cmd); - my $pbuffer=''; - my $olbuffer=''; - my $elbuffer=''; - My::Select::reader($s,sub{cmd_rx($host,$s,\$pbuffer,\$olbuffer,\$elbuffer)}); - $slave=1; - My::Select::run() if $slave;; + my ($host,@cmd)=@_; + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + my $s=new IO::Socket::INET(PeerAddr=>$host,PeerPort=>$CLP_PORT); + unless (defined $s) { + die "$host: $!\n"; + } + clp_send_message($s,'CMD',@cmd); + my $pbuffer=''; + my $olbuffer=''; + my $elbuffer=''; + My::Select::reader($s,sub{cmd_rx($host,$s,\$pbuffer,\$olbuffer,\$elbuffer)}); + $slave=1; + My::Select::run() if $slave;; } sub lsof { - my ($pattern)=@_; - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - for my $host (sort(expand_hostconfig_hosts('amd'))) { - next if $host eq 'tux'; - my $s=new IO::Socket::INET (PeerAddr=>$host,PeerPort=>$CLP_PORT); - unless (defined $s) { - warn "$host: $!\n"; - next; - } - clp_send_message($s,'LSOF',$pattern); - my $pbuffer=''; - my $olbuffer=''; - My::Select::reader($s,sub{lsof_rx($host,$s,\$pbuffer,\$olbuffer)}); - $slave++; - } - My::Select::run() if $slave;; + my ($pattern)=@_; + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + for my $host (sort(expand_hostconfig_hosts('amd'))) { + next if $host eq 'tux'; + my $s=new IO::Socket::INET (PeerAddr=>$host,PeerPort=>$CLP_PORT); + unless (defined $s) { + warn "$host: $!\n"; + next; + } + clp_send_message($s,'LSOF',$pattern); + my $pbuffer=''; + my $olbuffer=''; + My::Select::reader($s,sub{lsof_rx($host,$s,\$pbuffer,\$olbuffer)}); + $slave++; + } + My::Select::run() if $slave;; } sub lsof_rx { - my ($host,$s,$pbufref,$olbufref)=@_; - my $data; - defined $s->recv($data,$TCP_MAX*2) or return; - if (!length($data)) { - close $s; - --$slave; - exit $exit_value unless $slave; - return; - } - $$pbufref.=$data; - while (1) { - last if length($$pbufref)<2; - my $l=unpack('n',$$pbufref); - last if length($$pbufref)<2+$l; - my $msg=substr($$pbufref,2,$l); - $$pbufref=substr($$pbufref,2+$l); - lsof_msg($host,$s,$msg,$olbufref); - } - My::Select::reader_requeue(); + my ($host,$s,$pbufref,$olbufref)=@_; + my $data; + defined $s->recv($data,$TCP_MAX*2) or return; + if (!length($data)) { + close $s; + --$slave; + exit $exit_value unless $slave; + return; + } + $$pbufref.=$data; + while (1) { + last if length($$pbufref)<2; + my $l=unpack('n',$$pbufref); + last if length($$pbufref)<2+$l; + my $msg=substr($$pbufref,2,$l); + $$pbufref=substr($$pbufref,2+$l); + lsof_msg($host,$s,$msg,$olbufref); + } + My::Select::reader_requeue(); } sub lsof_msg { - my ($host,$s,$data,$olbufref)=@_; - my $msg=$$olbufref.$data; - for ($msg=~/([^\n]*\n)/gs) { - print "$host $_"; - } - ($$olbufref)=$msg=~/([^\n]*)\z/; + my ($host,$s,$data,$olbufref)=@_; + my $msg=$$olbufref.$data; + for ($msg=~/([^\n]*\n)/gs) { + print "$host $_"; + } + ($$olbufref)=$msg=~/([^\n]*)\z/; } sub cmd_rx { - my ($host,$s,$pbufref,$olbufref,$elbufref)=@_; - my $data; - defined $s->recv($data,$TCP_MAX*2) or return; - if (!length($data)) { - close $s; - --$slave; - exit $exit_value unless $slave; - return; - } - $$pbufref.=$data; - while (1) { - last if length($$pbufref)<2; - my $l=unpack('n',$$pbufref); - last if length($$pbufref)<2+$l; - my $msg=substr($$pbufref,2,$l); - $$pbufref=substr($$pbufref,2+$l); - cmd_msg($host,$s,$msg,$olbufref,$elbufref); - } - My::Select::reader_requeue(); + my ($host,$s,$pbufref,$olbufref,$elbufref)=@_; + my $data; + defined $s->recv($data,$TCP_MAX*2) or return; + if (!length($data)) { + close $s; + --$slave; + exit $exit_value unless $slave; + return; + } + $$pbufref.=$data; + while (1) { + last if length($$pbufref)<2; + my $l=unpack('n',$$pbufref); + last if length($$pbufref)<2+$l; + my $msg=substr($$pbufref,2,$l); + $$pbufref=substr($$pbufref,2+$l); + cmd_msg($host,$s,$msg,$olbufref,$elbufref); + } + My::Select::reader_requeue(); } sub cmd_msg { - my ($host,$s,$data,$olbufref,$elbufref)=@_; - - my $channel=substr($data,0,1); - if ($channel eq 'X') { - my $rdata=substr($data,1); - $rdata!=0 and $exit_value=1; - return; - } elsif ($channel eq 'O') { - my $msg=$$olbufref.substr($data,1); - for ($msg=~/([^\n]*\n)/gs) { - print "$host: $_"; - } - ($$olbufref)=$msg=~/([^\n]*)\z/; - } elsif ($channel eq 'E') { - my $msg=$$elbufref.substr($data,1); - for ($msg=~/([^\n]*\n)/gs) { - print STDERR "$host: $_"; - } - ($$elbufref)=$msg=~/([^\n]*)\z/; - } -} - -our $SYS_SENDFILE=40; # /usr/include/asm/unistd_64.h + my ($host,$s,$data,$olbufref,$elbufref)=@_; + + my $channel=substr($data,0,1); + if ($channel eq 'X') { + my $rdata=substr($data,1); + $rdata!=0 and $exit_value=1; + return; + } elsif ($channel eq 'O') { + my $msg=$$olbufref.substr($data,1); + for ($msg=~/([^\n]*\n)/gs) { + print "$host: $_"; + } + ($$olbufref)=$msg=~/([^\n]*)\z/; + } elsif ($channel eq 'E') { + my $msg=$$elbufref.substr($data,1); + for ($msg=~/([^\n]*\n)/gs) { + print STDERR "$host: $_"; + } + ($$elbufref)=$msg=~/([^\n]*)\z/; + } +} + +our $SYS_SENDFILE = 40; # /usr/include/asm/unistd_64.h sub sendfile { - my ($out_fd,$in_fd,$offset,$count)=@_; - my $ret=syscall($SYS_SENDFILE,fileno($out_fd),fileno($in_fd),$offset,$count); - return $ret<0 ? undef : $ret; + my ($out_fd,$in_fd,$offset,$count)=@_; + my $ret=syscall($SYS_SENDFILE,fileno($out_fd),fileno($in_fd),$offset,$count); + return $ret<0 ? undef : $ret; } sub clp_rx_PULL { - my ($s,$st_want)=@_; - - my $st_is=Donald::FileInfo->lstat($st_want->name); - if (!defined $st_is or $st_is->type ne 'F' or $st_is->size != $st_want->size or $st_is->mtime != $st_want->mtime) { - warn $st_want->name." requested by ".$s->peerhost.": no longer available\n"; - return; - } - my $fh; - unless (open $fh,'<',$st_want->name) { - warn $st_want->name.": $!\n"; - return; - } - my $bytes=$st_is->size; - my $cb_tmo=sub { My::Select::cancel_handle($s); }; - my $cb_write=sub { - my $l=sendfile($s,$fh,0,$bytes); - unless (defined $l) { - warn "$!"; - return; - } - $bytes-=$l; - if ($bytes) { - My::Select::writer_requeue if $bytes; - } else { - close($s); - } - }; - My::Select::timeout(5,$cb_tmo); - My::Select::writer($s,$cb_write); + my ($s,$st_want)=@_; + + my $st_is=Donald::FileInfo->lstat($st_want->name); + if (!defined $st_is or $st_is->type ne 'F' or $st_is->size != $st_want->size or $st_is->mtime != $st_want->mtime) { + warn $st_want->name." requested by ".$s->peerhost.": no longer available\n"; + return; + } + my $fh; + unless (open $fh,'<',$st_want->name) { + warn $st_want->name.": $!\n"; + return; + } + my $bytes=$st_is->size; + my $cb_tmo=sub { My::Select::cancel_handle($s); }; + my $cb_write=sub { + my $l=sendfile($s,$fh,0,$bytes); + unless (defined $l) { + warn "$!"; + return; + } + $bytes-=$l; + if ($bytes) { + My::Select::writer_requeue if $bytes; + } else { + close($s); + } + }; + My::Select::timeout(5,$cb_tmo); + My::Select::writer($s,$cb_write); } sub udp_rx_push { - my ($ip,$st_want)=@_; - - my $filename=$st_want->name; - my $tmp_filename="$filename.tmp"; - - $ip eq $my_ip and return; - - my $st_is=Donald::FileInfo->lstat($st_want->name); - - unless ($st_want->type eq 'F') { - warn "$filename: type ".$st_want->type." not yet implemented\n"; - return; - } - - if ($st_is - && $st_is->type eq 'F' - && $st_is->size == $st_want->size - && $st_is->mtime == $st_want->mtime - && $st_is->uid == $st_want->uid - && $st_is->gid == $st_want->gid - && $st_is->perm == $st_want->perm - ) { - warn "$filename: already okay\n"; - return; - } - - if ($st_want->size==0) { - -e $tmp_filename and unlink($tmp_filename); - my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); - defined $fh or return warn "$tmp_filename: $!\n"; - # no need to fsync empty file - chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; - chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; - utime($st_want->mtime,$st_want->mtime,$tmp_filename); - rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n";; - warn "installed (empty) $filename\n"; - return; - } - - my $s; - $s=My::Select::INET::connect_tcp($ip,$CLP_PORT,$TCP_TIMEOUT,sub { - $! and return warn "$ip: $!\n"; - send_tcp_cp($s,sub { - $! and return warn "$ip: $!\n"; - -e $tmp_filename and unlink($tmp_filename); - my $fh = IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); - defined $fh or return warn "$tmp_filename: $!\n"; - - my $cb; - my $bytes=$st_want->size; - $cb=sub { - # note, we need to break the circular references $cb of our caller, if no longer needed - my ($buf)=@_; - if ($!) { warn "$ip: $!\n";$cb=undef;return; } - if (length($buf)==0) { warn "$ip: EOF\n";$cb=undef;return;} - $fh->print($buf) or return warn "$tmp_filename: $!\n"; - $bytes-=length($buf); - if ($bytes>0) { - My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); - return; - } - $cb=undef; - $fh->flush(); - $fh->sync(); - chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; - chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; - utime($st_want->mtime,$st_want->mtime,$tmp_filename); - rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; - warn "installed $filename\n"; - }; - My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); - },$TCP_TIMEOUT,'PULL',$st_want); - }); + my ($ip,$st_want)=@_; + + my $filename=$st_want->name; + my $tmp_filename="$filename.tmp"; + + $ip eq $my_ip and return; + + my $st_is=Donald::FileInfo->lstat($st_want->name); + + unless ($st_want->type eq 'F') { + warn "$filename: type ".$st_want->type." not yet implemented\n"; + return; + } + + if ($st_is + && $st_is->type eq 'F' + && $st_is->size == $st_want->size + && $st_is->mtime == $st_want->mtime + && $st_is->uid == $st_want->uid + && $st_is->gid == $st_want->gid + && $st_is->perm == $st_want->perm + ) { + warn "$filename: already okay\n"; + return; + } + + if ($st_want->size==0) { + -e $tmp_filename and unlink($tmp_filename); + my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); + defined $fh or return warn "$tmp_filename: $!\n"; + # no need to fsync empty file + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$tmp_filename); + rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n";; + warn "installed (empty) $filename\n"; + return; + } + + my $s; + $s=My::Select::INET::connect_tcp($ip,$CLP_PORT,$TCP_TIMEOUT,sub { + $! and return warn "$ip: $!\n"; + send_tcp_cp($s,sub { + $! and return warn "$ip: $!\n"; + -e $tmp_filename and unlink($tmp_filename); + my $fh = IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); + defined $fh or return warn "$tmp_filename: $!\n"; + + my $cb; + my $bytes=$st_want->size; + $cb=sub { + # note, we need to break the circular references $cb of our caller, if no longer needed + my ($buf)=@_; + if ($!) { warn "$ip: $!\n";$cb=undef;return; } + if (length($buf)==0) { warn "$ip: EOF\n";$cb=undef;return;} + $fh->print($buf) or return warn "$tmp_filename: $!\n"; + $bytes-=length($buf); + if ($bytes>0) { + My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); + return; + } + $cb=undef; + $fh->flush(); + $fh->sync(); + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$tmp_filename); + rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; + warn "installed $filename\n"; + }; + My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); + },$TCP_TIMEOUT,'PULL',$st_want); + }); } sub cmd_push { - my @files=@_; - for my $filename (@files) { - $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; - -e $filename or die "$filename: no such file\n"; - } - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - for my $filename (@files) { - my $st=Donald::FileInfo->lstat($filename); - defined $st or die "$filename: $!\n"; - $st->type eq 'F' or die "$filename: only plain files currently supported\n"; - open my $test,'<',$filename or die "$filename: $!\n"; - udp_broadcast_message($donald_s,'push',$my_ip,$st); - } + my @files=@_; + for my $filename (@files) { + $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; + -e $filename or die "$filename: no such file\n"; + } + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + for my $filename (@files) { + my $st=Donald::FileInfo->lstat($filename); + defined $st or die "$filename: $!\n"; + $st->type eq 'F' or die "$filename: only plain files currently supported\n"; + open my $test,'<',$filename or die "$filename: $!\n"; + udp_broadcast_message($donald_s,'push',$my_ip,$st); + } } #------------------------------------------------------------ @@ -1988,23 +1987,23 @@ our $TRUSTCHECK_PORT=236; our $trustcheck_listen_socket; sub trustcheck_init { - $trustcheck_listen_socket=new IO::Socket::INET(LocalPort=>$TRUSTCHECK_PORT,Proto=>'tcp',Listen=>10,ReuseAddr=>1); - defined $trustcheck_listen_socket or die "$!\n"; - My::Select::reader($trustcheck_listen_socket,\&trustcheck_connect_request); + $trustcheck_listen_socket=new IO::Socket::INET(LocalPort=>$TRUSTCHECK_PORT,Proto=>'tcp',Listen=>10,ReuseAddr=>1); + defined $trustcheck_listen_socket or die "$!\n"; + My::Select::reader($trustcheck_listen_socket,\&trustcheck_connect_request); } sub trustcheck_connect_request { - My::Select::reader_requeue(); - my $socket=$trustcheck_listen_socket->accept(); - $socket->blocking(0); - my $hostname = gethostbyaddr(inet_aton($socket->peerhost()), AF_INET); - system 'hostconfig','--host',$hostname,'amd'; - if ($? == 0) { - $socket->send("I trust you\n", 0); - } elsif ($? == 256) { - $socket->send("I don't trust you\n", 0); - } - close($socket); + My::Select::reader_requeue(); + my $socket=$trustcheck_listen_socket->accept(); + $socket->blocking(0); + my $hostname = gethostbyaddr(inet_aton($socket->peerhost()), AF_INET); + system 'hostconfig','--host',$hostname,'amd'; + if ($? == 0) { + $socket->send("I trust you\n", 0); + } elsif ($? == 256) { + $socket->send("I don't trust you\n", 0); + } + close($socket); } #------------------------------------------------------------ @@ -2013,7 +2012,7 @@ use constant USAGE => <<'__EOF__'; usage: $0 [options] - --push file # broadcast this file + --push file # broadcast this file --push-amd-tar # broadcast /etc/amd --send-restart # broadcast a restart request to all nodes --exec mkmotd # execute /usr/sbin/mkmotd.pl on all nodes @@ -2033,89 +2032,89 @@ __EOF__ use Getopt::Long; GetOptions ( - 'daemon' => \$options{'daemon'}, - 'push=s' => \$options{'push'}, - 'exec=s' => \$options{'exec'}, - 'push-amd-tar' => \$options{'push_amd_tar'}, - 'send-restart' => \$options{'send-restart'}, - 'flush-gidcache' => \$options{'flush-gidcache'}, - 'make-automaps' => \$options{'make-automaps'}, - 'reexport' => \$options{'reexport'}, - 'lsof=s' => \$options{'lsof'}, + 'daemon' => \$options{'daemon'}, + 'push=s' => \$options{'push'}, + 'exec=s' => \$options{'exec'}, + 'push-amd-tar' => \$options{'push_amd_tar'}, + 'send-restart' => \$options{'send-restart'}, + 'flush-gidcache' => \$options{'flush-gidcache'}, + 'make-automaps' => \$options{'make-automaps'}, + 'reexport' => \$options{'reexport'}, + 'lsof=s' => \$options{'lsof'}, ) or die USAGE; if (defined $options{'push'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - push_file($donald_s,$options{'push'}); + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + push_file($donald_s,$options{'push'}); } elsif (defined $options{'exec'}) { - if (substr($options{'exec'},0,1) eq '@') { - length(length($options{'exec'})>1) or die USAGE; - @ARGV>=1 or die USAGE; - exec_at(substr($options{'exec'},1),@ARGV); - } else { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - send_exec($donald_s,$options{'exec'}); - } + if (substr($options{'exec'},0,1) eq '@') { + length(length($options{'exec'})>1) or die USAGE; + @ARGV>=1 or die USAGE; + exec_at(substr($options{'exec'},1),@ARGV); + } else { + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + send_exec($donald_s,$options{'exec'}); + } } elsif (defined $options{'push_amd_tar'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - push_amd_tar($donald_s); + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + push_amd_tar($donald_s); } elsif (defined $options{'send-restart'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - udp_broadcast_message($donald_s,'restart'); + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s,'restart'); } elsif (defined $options{'flush-gidcache'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - udp_broadcast_message($donald_s,'flush-gidcache'); + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s,'flush-gidcache'); } elsif (defined $options{'make-automaps'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - udp_broadcast_message($donald_s,'make-automaps'); + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s,'make-automaps'); } elsif (defined $options{'reexport'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - udp_broadcast_message($donald_s,'reexport'); + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s,'reexport'); } elsif (defined $options{'daemon'}) { - $SIG{PIPE}='IGNORE'; + $SIG{PIPE}='IGNORE'; - $donald_s=new My::Select::INET(Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT) or die "$!\n"; - $donald_s->receive_data(\&udp_message,$donald_s); + $donald_s=new My::Select::INET(Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT) or die "$!\n"; + $donald_s->receive_data(\&udp_message,$donald_s); - openlog('clusterd','pid','daemon'); - Sys::Syslog::setlogsock('unix'); # with 'native' we get EOLs in the logfile, option "noeol" doesn't work + openlog('clusterd','pid','daemon'); + Sys::Syslog::setlogsock('unix'); # with 'native' we get EOLs in the logfile, option "noeol" doesn't work - check_progfile_status(); - warn "server started - ".version_info()."\n"; - init_area(); - mgmt_init(); - clp_init(); - trustcheck_init(); + check_progfile_status(); + warn "server started - ".version_info()."\n"; + init_area(); + mgmt_init(); + clp_init(); + trustcheck_init(); - sync_cluster_pw() or warn "$CLUSTER_PW_FILE: $!\n"; + sync_cluster_pw() or warn "$CLUSTER_PW_FILE: $!\n"; - My::Select::timeout(60,\&purge_old_receiver); - My::Select::timeout(rand(60),\&send_stat); - My::Select::timeout(0,\&sample_rproc) unless Donald::Tools::is_alpha; - $my_hostname eq $STAT_TARGET and My::Cluster::Updown::init(); - $my_hostname eq 'macheteinfach' and My::NetlogReceiver::init(); - My::Select::timeout(600,\&check_overload); + My::Select::timeout(60,\&purge_old_receiver); + My::Select::timeout(rand(60),\&send_stat); + My::Select::timeout(0,\&sample_rproc) unless Donald::Tools::is_alpha; + $my_hostname eq $STAT_TARGET and My::Cluster::Updown::init(); + $my_hostname eq 'macheteinfach' and My::NetlogReceiver::init(); + My::Select::timeout(600,\&check_overload); - My::Select::run(); + My::Select::run(); } elsif ($options{'lsof'}) { - lsof($options{'lsof'}); + lsof($options{'lsof'}); } elsif ($options{'kill'}) { - Donald::Tools::kill_previous_server('clusterd'); + Donald::Tools::kill_previous_server('clusterd'); } else { - @ARGV or die USAGE; - my ($cmd,@args)=@ARGV; - if ($cmd eq 'push') { - @args>0 or die USAGE; - cmd_push(@args); - } else { - die USAGE; - } + @ARGV or die USAGE; + my ($cmd,@args)=@ARGV; + if ($cmd eq 'push') { + @args>0 or die USAGE; + cmd_push(@args); + } else { + die USAGE; + } } From 856d68227184ab755fac97d1ad6b1bf62c64569d Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Mon, 27 Jan 2025 09:20:21 +0100 Subject: [PATCH 2/7] clusters: Move brackets --- clusterd/clusterd | 51 ++++++++++++++++------------------------------- 1 file changed, 17 insertions(+), 34 deletions(-) diff --git a/clusterd/clusterd b/clusterd/clusterd index 818a145..2851a3f 100755 --- a/clusterd/clusterd +++ b/clusterd/clusterd @@ -228,24 +228,21 @@ package My::Select; our $time=Donald::Tools::uptime(); -sub My::Select::time -{ +sub My::Select::time { return $time; } our @TIMER=(); # ( [duetime,cb] , ... ) sorted by time our $active_timer_cb; -sub timeout # cb=Select::timeout(seconds,cb) -{ +sub timeout { # cb=Select::timeout(seconds,cb) my ($delta,$cb)=@_; my $duetime=$time+$delta; @TIMER=sort {$a->[0] <=> $b->[0]} ( [$duetime,$cb] , @TIMER ); return $cb; } -sub timeout_cancel # cb=Select::timeout_cancel(cb) -{ +sub timeout_cancel { # cb=Select::timeout_cancel(cb) my ($cb)=@_; @TIMER=grep {$_->[1] != $cb} @TIMER; return $cb; @@ -260,22 +257,19 @@ our @WRITER; our @EXCEPT; our $active_io; # [Handle,cb] -sub reader # cb = Select::reader(Handle,cb) -{ +sub reader { # cb = Select::reader(Handle,cb) my ($handle,$cb)=@_; push @READER,[$handle,$cb]; return $cb; } -sub writer # cb = Select::writer(Handle,cb) -{ +sub writer { # cb = Select::writer(Handle,cb) my ($handle,$cb)=@_; push @WRITER,[$handle,$cb]; return $cb; } -sub except # cb = Select::except(Handle,cb) -{ +sub except { # cb = Select::except(Handle,cb) my ($handle,$cb)=@_; push @EXCEPT,[$handle,$cb]; return $cb; @@ -285,8 +279,7 @@ sub reader_requeue {push @READER,$active_io} sub writer_requeue {push @WRITER,$active_io} sub except_requeue {push @EXCEPT,$active_io} -sub cancel # $cb = Select::cancel(cb) -{ +sub cancel { # $cb = Select::cancel(cb) my ($cb)=@_; defined $cb or $cb=$active_io->[1]; @READER=grep {$_->[1] != $cb} @READER; @@ -294,16 +287,14 @@ sub cancel # $cb = Select::cancel(cb) @EXCEPT=grep {$_->[1] != $cb} @EXCEPT; } -sub cancel_handle -{ +sub cancel_handle { my ($handle)=@_; @READER=grep {$_->[0] != $handle} @READER; @WRITER=grep {$_->[0] != $handle} @WRITER; @EXCEPT=grep {$_->[0] != $handle} @EXCEPT; } -sub run -{ +sub run { while (1) { $time=Donald::Tools::uptime(); while (@TIMER && $TIMER[0]->[0]<=$time) { @@ -359,21 +350,18 @@ our $UDP_MAX = 1472; # for broadcast on alphas our $SOL_SOCKET=1; our $SO_ERROR=4; -sub get_socket_error -{ +sub get_socket_error { my ($s)=@_; return unpack('i',getsockopt($s,$SOL_SOCKET,$SO_ERROR)); } -sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) -{ +sub new { # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) my ($class,@args)=@_; our $socket=new IO::Socket::INET (@args) or return undef; return bless \$socket,$class; } -sub send_data -{ +sub send_data { my ($self,$ip,$port,$data)=@_; my $ip_address=inet_aton($ip); unless (defined $ip_address) {carp("can't resolve $ip\n");return undef} @@ -381,14 +369,12 @@ sub send_data $$self->send($data,0,pack_sockaddr_in($port,$ip_address)) or carp "$!\n"; } -sub reader -{ +sub reader { my ($self,$sub)=@_; My::Select::reader($$self,$sub); } -sub receive_data -{ +sub receive_data { my ($self,$cb,@args)=@_; my $receive_data_cb=sub { @@ -776,8 +762,7 @@ $ENV{'PATH'} = '/usr/local/bin:/sbin:/usr/sbin:/bin'.($ENV{PATH}?':'.$ENV{PATH}: our ($udp_peer_addr,$udp_peer_port); # ('141.14.12.12',1234) -our %UDP_HANDLER = -( +our %UDP_HANDLER = ( 'filedata' => \&udp_rx_filedata, 'filedata.2' => \&udp_rx_filedata, 'amdtardata' => \&udp_rx_amdtardata, @@ -1636,8 +1621,7 @@ sub sync_cluster_pw { # area routing -our %AREA_ROUTER = -( +our %AREA_ROUTER = ( wtf => '141.14.31.255', ); @@ -2030,8 +2014,7 @@ usage: $0 [options] __EOF__ use Getopt::Long; -GetOptions -( +GetOptions ( 'daemon' => \$options{'daemon'}, 'push=s' => \$options{'push'}, 'exec=s' => \$options{'exec'}, From 4f8f67475b10f0cd2f20e3df284662e67af4705f Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Mon, 27 Jan 2025 09:25:46 +0100 Subject: [PATCH 3/7] clusterd: Remove special code for alpha --- clusterd/clusterd | 42 +++++++++++++----------------------------- 1 file changed, 13 insertions(+), 29 deletions(-) diff --git a/clusterd/clusterd b/clusterd/clusterd index 2851a3f..82f6a66 100755 --- a/clusterd/clusterd +++ b/clusterd/clusterd @@ -31,10 +31,6 @@ sub machine { return $machine; } -sub is_alpha { - return machine() eq 'alpha'; -} - sub uptime { open U,'<','/proc/uptime' or die "/proc/uptime: $!\n"; my $data; @@ -345,7 +341,7 @@ package My::Select::INET ; use Carp; use IO::Socket::INET; -our $UDP_MAX = 1472; # for broadcast on alphas +our $UDP_MAX = 1472; our $SOL_SOCKET=1; our $SO_ERROR=4; @@ -719,7 +715,7 @@ use Sys::Syslog; use IO::Socket::INET; use Data::Dumper; -our $UDP_MAX=1472; # for broadcast on alphas +our $UDP_MAX=1472; our $UDP_PORT=234; our $BC_RATE = 8; # packets per second broadcast our $TCP_TIMEOUT=30; # default timeout for tcp processing @@ -1012,9 +1008,6 @@ if ($machine eq 'i686') { $UTIME_OMIT=(1<<30)-2; # /usr/include/bits/stat.h $AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/fcntl.h $lmtime_sub=\&lmtime_utimensat; -} elsif ($machine eq 'alpha') { - $SYS_lchown=208; # SYS_lchown in /usr/include/syscall.h - $SYS_mknod=14; # SYS_mknod } elsif ($machine eq 'amd64') { $SYS_lchown=254; # SYS_lchown in /usr/include/syscall.h $SYS_mknod=14; # SYS_mknod @@ -1198,7 +1191,6 @@ sub sample_rproc { # every 5 seconds our ($CPUS,$BOGOMIPS); sub init_cpuinfo { - Donald::Tools::is_alpha and return; open L,'<','/proc/cpuinfo' or return; while () { if (/^bogomips\s*:\s*([\d.]+)/) {$CPUS++;$BOGOMIPS+=$1;} @@ -1206,25 +1198,17 @@ sub init_cpuinfo { } sub loadavg { # AXP : (system load average) , LINUX: (system load average, pload, freebogo) - if (Donald::Tools::is_alpha) { - my $data=pack 'l!3lx4l!3'; - my $i=syscall(85,3,0,$data,1,length($data)); # table(id=TBL_LOADAVG,index=0,addr=data,nel=1,lel=56) - my ($l0,$l1,$l2,$scale,$mach0,$mach1,$mach2)=unpack 'l!3lx4l!3',$data; - $scale or return undef; - return $l0/$scale; + $CPUS or init_cpuinfo(); + my $running_proc=running_proc(); + open L,'<','/proc/loadavg' or return undef; + my $data; + sysread(L,$data,1024); + close L; + $data =~ /^(\d+\.?\d*)/ or return undef; # 5 min loadavg + if ($CPUS) { + return ($1+0,($running_proc)/$CPUS,($running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS)); } else { - $CPUS or init_cpuinfo(); - my $running_proc=running_proc(); - open L,'<','/proc/loadavg' or return undef; - my $data; - sysread(L,$data,1024); - close L; - $data =~ /^(\d+\.?\d*)/ or return undef; # 5 min loadavg - if ($CPUS) { - return ($1+0,($running_proc)/$CPUS,($running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS)); - } else { - return $1+0; - } + return $1+0; } } @@ -2081,7 +2065,7 @@ if (defined $options{'push'}) { My::Select::timeout(60,\&purge_old_receiver); My::Select::timeout(rand(60),\&send_stat); - My::Select::timeout(0,\&sample_rproc) unless Donald::Tools::is_alpha; + My::Select::timeout(0,\&sample_rproc); $my_hostname eq $STAT_TARGET and My::Cluster::Updown::init(); $my_hostname eq 'macheteinfach' and My::NetlogReceiver::init(); My::Select::timeout(600,\&check_overload); From c8ae9d742b7799a5609ce2281c0ae5c81c7f0588 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Mon, 27 Jan 2025 09:32:47 +0100 Subject: [PATCH 4/7] clusterd: Remove i686 and amd64 architectures --- clusterd/clusterd | 27 ++++----------------------- 1 file changed, 4 insertions(+), 23 deletions(-) diff --git a/clusterd/clusterd b/clusterd/clusterd index 82f6a66..4723bfd 100755 --- a/clusterd/clusterd +++ b/clusterd/clusterd @@ -981,25 +981,11 @@ sub udp_rx_amdtardata { system '/sbin/make-automaps'; } -our ($machine,$SYS_lchown,$SYS_mknod,$lmtime_sub); +our ($machine,$SYS_lchown,$SYS_mknod); our ($SYS_utimensat,$AT_FDCWD,$UTIME_OMIT,$AT_SYMLINK_NOFOLLOW); -sub lmtime_unsupported { - my ($path,$mtime)=@_; - warn "$path: don't known how to change symlink mtime on target architecture\n"; -} -sub lmtime_utimensat { - my ($path,$mtime)=@_; - my $tsa=pack 'qqqq',0,$UTIME_OMIT,$mtime,0; - syscall($SYS_utimensat,$AT_FDCWD,$path,$tsa,$AT_SYMLINK_NOFOLLOW)==0 or return warn "$path: failed to lmtime: $!\n"; -} -$lmtime_sub=\&lmtime_unsupported; - chomp($machine=`uname -m`); -if ($machine eq 'i686') { - $SYS_lchown=198; # __NR_lchown32 in /usr/include/asm/unistd.h - $SYS_mknod=14; # __NR_mknod -} elsif ($machine eq 'x86_64') { +if ($machine eq 'x86_64') { $SYS_lchown=94; # __NR_lchown in /usr/include/asm-x86_64/unistd.h $SYS_mknod=133; # __NR_mknod @@ -1007,10 +993,6 @@ if ($machine eq 'i686') { $AT_FDCWD=-100; # /usr/include/fcntl.h $UTIME_OMIT=(1<<30)-2; # /usr/include/bits/stat.h $AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/fcntl.h - $lmtime_sub=\&lmtime_utimensat; -} elsif ($machine eq 'amd64') { - $SYS_lchown=254; # SYS_lchown in /usr/include/syscall.h - $SYS_mknod=14; # SYS_mknod } elsif ($machine eq 'ppc64le') { $SYS_lchown=16; # __NR_lchown in /usr/include/powerpc64le-linux-gnu/asm/unistd.h $SYS_mknod=14; # __NR_mknod @@ -1019,7 +1001,6 @@ if ($machine eq 'i686') { $AT_FDCWD=-100; # /usr/include/linux/fcntl.h $UTIME_OMIT=(1<<30)-2; # /usr/include/powerpc64le-linux-gnu/bits/stat.h $AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/linux/fcntl.h - $lmtime_sub=\&lmtime_utimensat; } else { warn "unknown machine type $machine: symlink ownership can't be set.\n"; warn "unknown machine type $machine: named pipes,character and block devices can't be created\n"; @@ -1033,8 +1014,8 @@ sub lchown { sub lmtime { my ($mtime,$path)=@_; - $lmtime_sub or return; - $lmtime_sub->($path,$mtime); + my $tsa=pack 'qqqq',0,$UTIME_OMIT,$mtime,0; + syscall($SYS_utimensat,$AT_FDCWD,$path,$tsa,$AT_SYMLINK_NOFOLLOW)==0 or return warn "$path: failed to lmtime: $!\n"; } sub udp_rx_filedata { From 4c74e49e45728dd230a1ee2cc8635b5ae6db3e57 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Mon, 27 Jan 2025 16:42:53 +0100 Subject: [PATCH 5/7] clusterd: Implement standard command service Implement new udp service 'exec.2' which accepts a list of predefined commands to run. Implement new command `clusterd exec CMD...` The new usage is supposed to make `clusterd --exec` and the 'exec' UDP service obsolete. Having all commands which might be issued after file updates available in the same %CMD infrastructure will make the following changes easier and the file shorter, once the obsolete command `clusterd --exec` and the UDP services udp_rx_flush_gidcache, udp_rx_make_automaps, udp_rx_reexport and udp_rx_exec are removed. --- clusterd/clusterd | 42 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/clusterd/clusterd b/clusterd/clusterd index 4723bfd..c417ad2 100755 --- a/clusterd/clusterd +++ b/clusterd/clusterd @@ -769,6 +769,7 @@ our %UDP_HANDLER = ( 'reexport' => \&udp_rx_reexport, 'log' => \&udp_rx_log, 'exec' => \&udp_rx_exec, + 'exec.2' => \&udp_rx_exec2, 'push' => \&udp_rx_push, ); @@ -871,8 +872,11 @@ sub push_file { } -our %CMD=( - 'mkmotd'=>'/usr/sbin/mkmotd.pl', +our %CMD = ( + 'mkmotd' => '/usr/sbin/mkmotd.pl', + 'flush-gidcache' => 'date -d tomorrow +%s > /proc/net/rpc/auth.unix.gid/flush', + 'reexport' => '/usr/bin/mxmount --reexport-only', + 'make-automaps' => '/sbin/make-automaps', ); sub send_exec { @@ -911,6 +915,25 @@ sub udp_rx_exec { wait; } +sub udp_rx_exec2 { + my @cmd = @_; + my $pid = fork; + unless (defined $pid) { + warn "$!\n"; + return; + } + if ($pid == 0) { + open STDIN,'<','/dev/null'; + alarm(60); + chdir '/'; + for my $cmd (@cmd) { + exists $CMD{$cmd} and warn "executing $CMD{$cmd}\n"; + exists $CMD{$cmd} and system '/bin/sh', '-c', $CMD{$cmd}; + } + exit; + } +} + #------------------------------------------------------------- sub normalize_seg { # [pos,len],[pos,len],... @@ -1930,6 +1953,16 @@ sub cmd_push { } } +sub cmd_exec { + my @cmd = @_; + for my $cmd (@cmd) { + exists $CMD{$cmd} or die "$cmd: only these commands are allowed: " . join(', ', keys %CMD) . "\n"; + } + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s, 'exec.2', @cmd); +} + #------------------------------------------------------------ our $TRUSTCHECK_PORT=236; @@ -1975,6 +2008,8 @@ usage: $0 [options] --daemon # start a daemon push files.... # push files over tcp + exec CMD... # execute CMD on all nodes + CMD : mkmotd | flush-gidcache | reexport | make-automaps __EOF__ @@ -2062,6 +2097,9 @@ if (defined $options{'push'}) { if ($cmd eq 'push') { @args>0 or die USAGE; cmd_push(@args); + } elsif ($cmd eq 'exec') { + @args > 0 or die USAGE; + cmd_exec(@args); } else { die USAGE; } From 9da937fe3bb83b2e3eebc4edd2592b95b43fde3b Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Mon, 27 Jan 2025 18:54:20 +0100 Subject: [PATCH 6/7] clusterd: Add --post option for push Change protocol for `clusterd push`: - Multiple files can be offered with one command invocation and one UDP broadcast - The files are actually transferred by calling back with tcp to the station which offered the files. This allows the receiving node to restrict file distribution sources to specific nodes. - The list of "trusted nodes" is set to `afk` and `wtf`. - A list of predefined commands can be given in addition to the list of offered files. The commands are executed after the files are transferred. This is required, because from the point of view of the caller of `cluster push`, the process is asynchronous, and so these commands can not be given with additional `clusterd exec` or `clusterd --exec` commands. Expected usage is something like this: clusterd push /etc/mxpolicy --post mkmotd After the daemons have updated, callers should be switches to the new commands and then the old commands should be removed. --- clusterd/clusterd | 148 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 139 insertions(+), 9 deletions(-) diff --git a/clusterd/clusterd b/clusterd/clusterd index c417ad2..17e13fa 100755 --- a/clusterd/clusterd +++ b/clusterd/clusterd @@ -771,6 +771,7 @@ our %UDP_HANDLER = ( 'exec' => \&udp_rx_exec, 'exec.2' => \&udp_rx_exec2, 'push' => \&udp_rx_push, + 'push.2' => \&udp_rx_push2, ); sub udp_message { @@ -1580,6 +1581,13 @@ sub send_tcp_cp { my $data=sign($CLUSTER_PW,encode(@args)); My::Select::INET::send_tcp($s,pack('n',length($data)).$data,$timeout,$cb); } + +sub send_tcp_cp_sync { + my ($s, @args) = @_; + my $data = sign($CLUSTER_PW, encode(@args)); + $s->printflush(pack('n', length($data)) . $data); +} + #---------------------------------------------------------- sub sync_cluster_pw { @@ -1936,21 +1944,142 @@ sub udp_rx_push { }); } +our %TRUSTED_IP = ( + '141.14.28.170' => 1, # afk + '141.14.16.131' => 1, # wtf +); + +sub is_trusted_ip { + my ($ip) = @_; + return exists $TRUSTED_IP{$ip} ? 1 : 0; +} + +sub udp_rx_push2 { + my ($ip, $st_ary, $post_ary) = @_; + + unless (is_trusted_ip($ip)) { + warn "reject to pull files from $ip : not trusted\n"; + return; + } + my $pid = fork; + unless (defined $pid) { + warn "$!\n"; + return + } + $pid != 0 and return; + + if ($ip ne $my_ip) { +FILE: + for my $st_want (@$st_ary) { + my $filename=$st_want->name; + my $tmp_filename="$filename.tmp"; + unless ($st_want->type eq 'F') { + warn "$filename: type ".$st_want->type." not yet implemented\n"; + next; + } + my $st_is = Donald::FileInfo->lstat($st_want->name); + if ($st_is + && $st_is->type eq 'F' + && $st_is->size == $st_want->size + && $st_is->mtime == $st_want->mtime + && $st_is->uid == $st_want->uid + && $st_is->gid == $st_want->gid + && $st_is->perm == $st_want->perm + ) { + warn "$filename: already okay\n"; + next; + } + if ($st_want->size == 0) { + -e $tmp_filename and unlink($tmp_filename); + my $fh = IO::File->new($tmp_filename, O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW, 0); + unless (defined $fh) { + warn "$tmp_filename: $!\n"; + next; + } + # no need to fsync empty file + chown $st_want->uid, $st_want->gid, $tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm, $tmp_filename or return warn "$tmp_filename: $!\n"; + utime($st_want->mtime, $st_want->mtime, $tmp_filename); + rename($tmp_filename, $filename) or return warn "rename $tmp_filename $filename: $!\n";; + warn "installed (empty) $filename\n"; + next; + } + my $s = new IO::Socket::INET(PeerAddr => $ip, PeerPort => $CLP_PORT); + unless ($s) { + warn "$ip: $!\n"; + next; + } + send_tcp_cp_sync($s, 'PULL', $st_want); + -e $tmp_filename and unlink($tmp_filename); + my $fh = IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); + unless (defined $fh) { + warn "$tmp_filename: $!\n"; + next; + } + my $bytes = $st_want->size; + while (1) { + my $buf = ""; + my $len = read($s, $buf, 1024); + if ($len < 0) { + warn "$ip $filename: $!\n"; + next FILE; + } + if ($len == 0) { + warn "$ip $filename: file received to short\n"; + next FILE; + } + if ($len > $bytes) { + warn "$ip $filename: file received to long\n"; + next FILE; + } + print $fh $buf; + $bytes -= $len; + if ($bytes == 0) { + $fh->flush(); + $fh->sync(); + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$tmp_filename); + rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; + warn "installed $filename\n"; + next FILE; + } + } + } + } + open STDIN, '<', '/dev/null'; + chdir '/'; + alarm(60); + for my $cmd (@$post_ary) { + exists $CMD{$cmd} and warn "executing $CMD{$cmd}\n"; + exists $CMD{$cmd} and system '/bin/sh', '-c', $CMD{$cmd}; + } + exit; +} + sub cmd_push { - my @files=@_; + my ($post, @files) = @_; + + is_trusted_ip($my_ip) or die "This command only works on a trusted host\n"; + + for my $cmd (@$post) { + exists $CMD{$cmd} or die "$cmd: only these commands are allowed: " . join(', ', keys %CMD) . "\n"; + } for my $filename (@files) { $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; -e $filename or die "$filename: no such file\n"; } - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + my @st = (); for my $filename (@files) { - my $st=Donald::FileInfo->lstat($filename); + my $st = Donald::FileInfo->lstat($filename); defined $st or die "$filename: $!\n"; $st->type eq 'F' or die "$filename: only plain files currently supported\n"; - open my $test,'<',$filename or die "$filename: $!\n"; - udp_broadcast_message($donald_s,'push',$my_ip,$st); + open my $test,'<', $filename or die "$filename: $!\n"; + push @st, $st; } + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s, 'push.2', $my_ip, \@st, $post); } sub cmd_exec { @@ -2007,8 +2136,8 @@ usage: $0 [options] --daemon # start a daemon - push files.... # push files over tcp - exec CMD... # execute CMD on all nodes + push [--post CMD] files... # push files over tcp + exec CMD... # execute CMD on all nodes CMD : mkmotd | flush-gidcache | reexport | make-automaps __EOF__ @@ -2024,6 +2153,7 @@ GetOptions ( 'make-automaps' => \$options{'make-automaps'}, 'reexport' => \$options{'reexport'}, 'lsof=s' => \$options{'lsof'}, + 'post=s@' => \$options{'post'}, ) or die USAGE; @@ -2096,7 +2226,7 @@ if (defined $options{'push'}) { my ($cmd,@args)=@ARGV; if ($cmd eq 'push') { @args>0 or die USAGE; - cmd_push(@args); + cmd_push($options{'post'} || [], @args); } elsif ($cmd eq 'exec') { @args > 0 or die USAGE; cmd_exec(@args); From adc5996933e9cda7f37f5ad8bba0d15d57181d57 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Mon, 27 Jan 2025 20:56:39 +0100 Subject: [PATCH 7/7] clusterd: Fix restart Currently restart isn't working, because we removed 'kill' from the options and udp_rx_restart is using it. Just exit instead, the service manage should restart the service. --- clusterd/clusterd | 37 ++++--------------------------------- 1 file changed, 4 insertions(+), 33 deletions(-) diff --git a/clusterd/clusterd b/clusterd/clusterd index 17e13fa..afcb828 100755 --- a/clusterd/clusterd +++ b/clusterd/clusterd @@ -68,28 +68,6 @@ sub decode { return @$msg; } -sub kill_previous_server { # kill_previous_server('clusterd') - my ($command)=@_; - my $ret=0; - - # quickfix - dont kill our parent which might be the init.d/script with the same name.... - - my $ppid; - for (`ps -o ppid,comm -p $$`) { - $ppid=$1 if /(\d+)/; - } - - for (`ps -Ao pid,comm`) { - my @F=split; - if ($F[1] eq $command && $F[0] ne $$ && $F[0] ne $ppid) { - kill 1=>$F[0]; - warn "stopped $command pid $F[0]\n"; - $ret=1; - } - } - return $ret; -} - #------------------------------------- package main; @@ -1244,15 +1222,10 @@ sub log_to_stat_target { # ---------------------------------------------------------- sub udp_rx_restart { - # double-fork, because kill_previous_server() won't kill its parent - my $pid=fork; - if (defined $pid && $pid==0) { - my $pid2=fork; - if (defined $pid2 && $pid==0) { - exec '/usr/sbin/clusterd','--kill','--daemon'; - die "exec failed: $!\n"; - } - } + # just exit and leave the restart to the service manager + # exist status 40 can be used with RestartForceExitStatus= + warn "received restart request. Exiting with status 40\n"; + exit 40; } sub udp_rx_flush_gidcache { @@ -2219,8 +2192,6 @@ if (defined $options{'push'}) { My::Select::run(); } elsif ($options{'lsof'}) { lsof($options{'lsof'}); -} elsif ($options{'kill'}) { - Donald::Tools::kill_previous_server('clusterd'); } else { @ARGV or die USAGE; my ($cmd,@args)=@ARGV;