Skip to content
Permalink
611389cf40
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
executable file 2167 lines (1851 sloc) 55.3 KB
#! /usr/local/system/perl/bin/perl
use warnings;
use strict;
use POSIX;
use IO::Pipe;
use Digest::MD5;
#-------------------------------------
package Donald::Tools;
our $VERSION = '1.00';
use Storable;
use Digest::MD5;
sub hostname {
our $hostname;
unless (defined $hostname) {
$hostname=lc `/bin/hostname`;
chomp($hostname);
$hostname =~ s/\.molgen\.mpg\.de$//;
}
return $hostname;
}
sub machine {
our $machine;
chomp($machine=`uname -m`) unless defined $machine;
return $machine;
}
sub is_alpha {
return machine() eq 'alpha';
}
sub uptime {
open U,'<','/proc/uptime' or die "/proc/uptime: $!\n";
my $data;
sysread(U,$data,1024);
close U;
$data=~ /^(\d+\.?\d*)/ or die "bad data from /proc/uptime: $data\n";
return $1+0;
}
sub encode {
return Storable::nfreeze([@_]);
}
sub sign {
my ($password,$data)=@_;
return Digest::MD5::md5($password.$data).$data; # 16 byte prefix
}
sub check_sign { # signed-data -> undef or signed-data -> data
my ($password,$data)=@_;
length $data>16 or return undef;
my $rx_digest=substr($data,0,16);
my $signature=Digest::MD5::md5($password.substr($data,16));
$rx_digest eq $signature or return undef;
return substr($data,16);
}
sub decode {
my ($data)=@_;
my $msg;
eval {
$msg=Storable::thaw($data);
};
$@ and return undef;
return @$msg;
}
sub kill_previous_server { # kill_previous_server('clusterd')
my ($command)=@_;
my $ret=0;
# quickfix - dont kill our parent which might be the init.d/script with the same name....
my $ppid;
for (`ps -o ppid,comm -p $$`) {
$ppid=$1 if /(\d+)/;
}
for (`ps -Ao pid,comm`) {
my @F=split;
if ($F[1] eq $command && $F[0] ne $$ && $F[0] ne $ppid) {
kill 1=>$F[0];
warn "stopped $command pid $F[0]\n";
$ret=1;
}
}
return $ret;
}
#-------------------------------------
package main;
*encode=*Donald::Tools::encode{CODE};
*sign=*Donald::Tools::sign{CODE};
*check_sign=*Donald::Tools::check_sign{CODE};
*decode=*Donald::Tools::decode{CODE};
#-------------------------------------
package Donald::FileInfo;
use warnings;
use strict;
our $VERSION = '1.00';
use Class::Struct (map {$_=>'$'} qw(name dev ino type perm nlink uid gid rdev size mtime target)) ;
use Fcntl ':mode';
sub fn_escape {
my ($fn)=@_;
$fn=~s/([[:^graph:]\\])/'\x'.sprintf('%02x',ord($1))/ge;
return $fn;
}
sub fn_unescape {
my ($fn)=@_;
$fn=~s/\\x([0-9a-f]{0,2})/chr(hex($1))/gie;
return $fn;
}
#
# internal storage:
#
# 0 1 2 3 4 5 6 7 8 9 10 11
# [ name , dev , ino , type , perm ,nlink , uid , gid , rdev , size , mtime target ]
#
# index file format:
#
# 0 1 2 3 4 5 6 7
# F path perm uid gid size mtime hardlink
# D path perm uid gid - mtime -
# L path perm uid gid target - hardlink
# B path perm uid gid rdev - hardlink
# C path perm uid gid rdev - hardlink
# P path perm uid gid - - hardlink
# S path perm uid gid - - hardlink
sub name_escaped {
my ($self)=shift;
return fn_escape($self->name);
}
sub fileid {
my ($self)=shift;
return($self->dev.'.'.$self->ino);
}
sub export_index {
my ($self)=@_;
my $type=$self->type;
return (
$type,
fn_escape($self->name),
$self->perm,
$self->uid,
$self->gid,
(
$type eq 'F' ? $self->size :
$type eq 'L' ? fn_escape($self->target) :
$type eq 'B' || $type eq 'C' ? $self->rdev :
'-'
),
$type eq 'F'||$type eq 'D' ? $self->mtime : '-',
);
}
sub import_index {
my ($class,@F)=@_;
my $type=$F[0];
return bless [
fn_unescape($F[1]), # name
0,0, # dev,ino
$type, # type
$F[2], # perm
0, # nlink
$F[3],$F[4], # uid,gid
($type eq 'B' || $type eq 'C' ? $F[5] : 0), # rdev
($type eq 'F' ? $F[5] : 0 ), # size
($type eq 'F'||$type eq 'D' ? $F[6] : 0 ), # mtime
($type eq 'L' ? fn_unescape($F[5]) : ''), # target
],
$class;
}
sub lstat {
my ($class,$filename)=@_;
my $target;
my @f;
unless (@f=lstat $filename) {
$!==2 and return undef; # ENOENT
die "$filename: $!\n";
}
if (-l _) {
defined ($target=readlink($filename)) or die "$filename: $!\n";
}
my $type =
S_ISREG($f[2]) ? 'F' :
S_ISDIR($f[2]) ? 'D' :
S_ISLNK($f[2]) ? 'L' :
S_ISBLK($f[2]) ? 'B' :
S_ISCHR($f[2]) ? 'C' :
S_ISFIFO($f[2]) ? 'P' :
S_ISSOCK($f[2]) ? 'S' :
die ("$filename: unsupported file type\n");
return bless [
$filename, # name
@f[0,1], # dev,ino
$type, # type
S_IMODE($f[2]), # perm
@f[3..7], # nlink,uid,gid,rdev,size
$f[9], # mtime
($type eq 'L' ? $target : '') # target
],
$class;
}
#-------------------------------------
package My::Select;
our $time=Donald::Tools::uptime();
sub My::Select::time
{
return $time;
}
our @TIMER=(); # ( [duetime,cb] , ... ) sorted by time
our $active_timer_cb;
sub timeout # cb=Select::timeout(seconds,cb)
{
my ($delta,$cb)=@_;
my $duetime=$time+$delta;
@TIMER=sort {$a->[0] <=> $b->[0]} ( [$duetime,$cb] , @TIMER );
return $cb;
}
sub timeout_cancel # cb=Select::timeout_cancel(cb)
{
my ($cb)=@_;
@TIMER=grep {$_->[1] != $cb} @TIMER;
return $cb;
}
sub timeout_requeue {timeout(shift,$active_timer_cb); } # Select::timeout_requeue(seconds)
#------------------------------------
our @READER; # ( [Handle,cb] , ... )
our @WRITER;
our @EXCEPT;
our $active_io; # [Handle,cb]
sub reader # cb = Select::reader(Handle,cb)
{
my ($handle,$cb)=@_;
push @READER,[$handle,$cb];
return $cb;
}
sub writer # cb = Select::writer(Handle,cb)
{
my ($handle,$cb)=@_;
push @WRITER,[$handle,$cb];
return $cb;
}
sub except # cb = Select::except(Handle,cb)
{
my ($handle,$cb)=@_;
push @EXCEPT,[$handle,$cb];
return $cb;
}
sub reader_requeue {push @READER,$active_io}
sub writer_requeue {push @WRITER,$active_io}
sub except_requeue {push @EXCEPT,$active_io}
sub cancel # $cb = Select::cancel(cb)
{
my ($cb)=@_;
defined $cb or $cb=$active_io->[1];
@READER=grep {$_->[1] != $cb} @READER;
@WRITER=grep {$_->[1] != $cb} @WRITER;
@EXCEPT=grep {$_->[1] != $cb} @EXCEPT;
}
sub cancel_handle
{
my ($handle)=@_;
@READER=grep {$_->[0] != $handle} @READER;
@WRITER=grep {$_->[0] != $handle} @WRITER;
@EXCEPT=grep {$_->[0] != $handle} @EXCEPT;
}
sub run
{
while (1) {
$time=Donald::Tools::uptime();
while (@TIMER && $TIMER[0]->[0]<=$time) {
$active_timer_cb=(shift @TIMER)->[1];
$active_timer_cb->();
}
$active_timer_cb=undef;
my ($rvec,$wvec,$evec)=('','','');
for (@READER) { vec($rvec,$_->[0]->fileno,1)=1 } ;
for (@WRITER) { vec($wvec,$_->[0]->fileno,1)=1 } ;
for (@EXCEPT) { vec($evec,$_->[0]->fileno,1)=1 } ;
my $ready=select($rvec,$wvec,$evec,1);
if ($ready>0) {
for (my $i=0;$i<@READER;$i++) {
if (vec($rvec,$READER[$i]->[0]->fileno,1)) {
$active_io=splice @READER,$i,1;
$active_io->[1]->();
$active_io=undef;
last;
}
}
for (my $i=0;$i<@WRITER;$i++) {
if (vec($wvec,$WRITER[$i]->[0]->fileno,1)) {
$active_io=splice @WRITER,$i,1;
$active_io->[1]->();
$active_io=undef;
last;
}
}
for (my $i=0;$i<@EXCEPT;$i++) {
if (vec($evec,$EXCEPT[$i]->[0]->fileno,1)) {
$active_io=splice @EXCEPT,$i,1;
$active_io->[1]->();
$active_io=undef;
last;
}
}
}
}
}
#--------------------------------------
package My::Select::INET ;
use Carp;
use IO::Socket::INET;
our $UDP_MAX=1472; # for broadcast on alphas
our $SOL_SOCKET=1;
our $SO_ERROR=4;
sub get_socket_error
{
my ($s)=@_;
return unpack('i',getsockopt($s,$SOL_SOCKET,$SO_ERROR));
}
sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT )
{
my ($class,@args)=@_;
our $socket=new IO::Socket::INET (@args) or return undef;
return bless \$socket,$class;
}
sub send_data
{
my ($self,$ip,$port,$data)=@_;
my $ip_address=inet_aton($ip);
unless (defined $ip_address) {carp("can't resolve $ip\n");return undef}
unless (length($data)<=$UDP_MAX) {carp("message to long\n");return undef}
$$self->send($data,0,pack_sockaddr_in($port,$ip_address)) or carp "$!\n";
}
sub reader
{
my ($self,$sub)=@_;
My::Select::reader($$self,$sub);
}
sub receive_data
{
my ($self,$cb,@args)=@_;
my $receive_data_cb=sub {
my $data;
my $peer = $$self->recv($data,$UDP_MAX);
my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer);
my $udp_peer_addr=inet_ntoa($peer_iaddr);
My::Select::reader_requeue();
$cb->($data,$udp_peer_addr,$udp_peer_port);
};
My::Select::reader($$self,$receive_data_cb);
}
# send_tcp($socket,$data,$timeout,$cb);
#
# send data asynchronously over noblocking tcp socket
# call callback when done ($!=0) or on error ($! set)
#
# all arguments required
#
sub send_tcp {
my ($s,$data,$timeout,$cb)=@_;
my $len=$s->send($data,0);
defined $len or return $cb->();
if ($len==length($data)) {
$!=0;
$cb->();
return;
}
my $pos=$len;
my $cb_tmo=sub {
My::Select::cancel_handle($s);
$!=110;
$cb->();
};
my $cb_write=sub {
My::Select::timeout_cancel($cb_tmo);
my $len=send($s,substr($data,$pos),0);
defined $len or return $cb->();
if ($len==length($data)-$pos) {
$!=0;
$cb->();
return;
}
$pos+=$len;
My::Select::timeout($timeout,$cb_tmo);
My::Select::writer_requeue();
};
My::Select::timeout($timeout,$cb_tmo);
My::Select::writer($s,$cb_write);
}
# $socket = connect_tcp ($ip,$port,$timeout,$cb)
#
# asynchronously connect to tcp socket.
# call callback when done or on error (with $! set)
#
# all arguments required
#
sub connect_tcp {
my ($ip,$port,$timeout,$cb)=@_;
my $s=new IO::Socket::INET (PeerAddr=>$ip,PeerPort=>$port,Blocking=>0);
defined $s or return $cb->();
my $cb_tmo=sub {
My::Select::cancel_handle($s);
$!=110;
$cb->();
};
my $cb_write=sub {
My::Select::timeout_cancel($cb_tmo);
$!=get_socket_error($s);
$cb->();
};
My::Select::timeout($timeout,$cb_tmo);
My::Select::writer($s,$cb_write);
return $s;
}
# read_with_timeout($socket,$callback,$timeout)
#
# asynchronously read from tcp socket.
#
sub read_with_timeout {
my ($s,$cb,$timeout)=@_;
my $cb_tmo=sub {
My::Select::cancel_handle($s);
$!=110;
$cb->(undef);
};
my $cb_read=sub {
My::Select::timeout_cancel($cb_tmo);
my $buf='';
my $l=sysread($s,$buf,102400,0);
if (!defined $l) {
$cb->(undef);
} else {
$!=0;
$cb->($buf);
}
};
My::Select::timeout($timeout,$cb_tmo);
My::Select::reader($s,$cb_read)
}
#--------------------------------------
package My::Cluster::Updown;
#
# monitor nodes
#
our %H; # ( name=> [state , last_seen , expect_seq , @data ] , ... )
our $MONITOR_STARTING=1;
our %DUPLICATES;
# data currently:
# 0: float load-average
# 1: clusterd version
# 2: processor load
# 3: processor capacity
# 4: string unix version
use Storable;
sub save_state {
store \%H,'/root/clusterd.monitor.state';
}
sub restore_state {
-e '/root/clusterd.monitor.state' and %H=%{retrieve '/root/clusterd.monitor.state'};
for (values %H) {
$_->[1]=0;
}
}
sub status {
my ($f)=@_;
my (@up,@down);
for (sort keys %H) { push @up, $_ if $H{$_}->[0] eq 'UP' }
for (sort keys %H) { push @down,$_ if $H{$_}->[0] eq 'DOWN' }
$f->printf("UP (%d): %s\n\n",scalar(@up),join(' ',@up));
$f->printf("DOWN (%d): %s\n\n",scalar(@down),join(' ',@down));
my (@k,$max);
@k=sort({$H{$b}->[3] <=> $H{$a}->[3]} grep({$H{$_}->[0] eq 'UP'} (keys %H)));
$max=0;
$f->print("TOP 10: ");
for (@k) {
$f->print(sprintf "%s (%4.2f) ",$_,$H{$_}->[3]);
last if $max++>=10;
}
$f->print("\n\n");
@k=sort({($H{$b}->[5]||0) <=> ($H{$a}->[5]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H)));
$max=0;
$f->print("TOP 10 CPU load : ");
for (@k) {
$f->print(sprintf "%s (%3.1f%%) ",$_,($H{$_}->[5]||0)*100);
last if $max++>=10;
}
$f->print("\n\n");
@k=sort({($H{$b}->[6]||0) <=> ($H{$a}->[6]||0)} grep({$H{$_}->[0] eq 'UP'} (keys %H)));
$max=0;
my $total_bogomips=0;
$f->print("TOP 10 free capacity : ");
for (@k) {
my $bogo=$H{$_}->[6]||0;
$f->print(sprintf "%s (%3.1f) ",$_,$bogo) if $max++<10;
$total_bogomips+=$bogo
}
$f->printf("\ntotal available bogomips: %d\n\n",$total_bogomips);
}
sub msg_text {
my ($text)=@_;
warn($text."\n");
my $time=scalar(localtime);
main::mgmt_print_all($time.': '.$text."\n");
}
sub msg_state {
my ($state_old,$state_new,$hostname,$extra)=@_;
msg_text(sprintf "%-4s -> %-4s %-20s %s",$state_old,$state_new,$hostname,$extra);
}
sub init {
restore_state;
msg_text('node monitor: started. (recovery mode)');
My::Select::timeout(630,sub{$MONITOR_STARTING=0;msg_text('node monitor: recovery finished');});
My::Select::timeout(630,\&timeout_hosts);
}
sub timeout_hosts {
My::Select::timeout_requeue(60);
my $timeout=My::Select::time()-1230; # 2x10 minutes + 30 seconds
for (keys %H) {
my $h=$H{$_};
if ($h->[0] eq 'UP' && $h->[1]<=$timeout) {
msg_state('UP','DOWN',$_,"timeout!");
$h->[0]='DOWN';
}
}
save_state();
}
sub rx_hostannounce {
# ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev)
my ($host,$seq,@more)=@_;
my ($load_average,$version,$pload,$pcapacity,$unixrev)=@more;
$unixrev ||= '?';
unless (exists $H{$host}) {
if ($seq==0) {
msg_state('NEW','UP',$host,"discovered new node $version - $unixrev");
} else {
$MONITOR_STARTING or msg_state('NEW','UP',$host,"discovered running node $version - $unixrev");
}
$H{$host}=['UP',My::Select::time,$seq+1,@more];
} else {
my $h=$H{$host};
if ($h->[0] eq 'UP') {
if ($seq == $h->[2]) {
;
} else {
if ($seq==0) {
# msg_state('UP','UP',$host,"node rebootet $version - $unixrev");
} elsif ($seq>$h->[2]) {
$MONITOR_STARTING or msg_state('UP','UP',$host,$seq-$h->[2]." packet(s) lost!");
} elsif ($seq+1==$h->[2]) {
warn "DUPLICATE from $host\n";
$DUPLICATES{$host}=($DUPLICATES{$host}||0)+1;
} else {
msg_state('UP','UP',$host,"node rebooted and ".$seq." packet(s) lost!");
}
}
} else {
if ($seq==0) {
msg_state('DOWN','UP',$host,"node rebooted $version - $unixrev");
} elsif ($seq==$h->[2]) {
msg_state('DOWN','UP',$host,"sequence error. Node to slow? (seq=$seq,exp=".$h->[2].")");
} else {
msg_state('DOWN','UP',$host,$seq-$h->[2]." packet(s) lost");
}
}
@$h=('UP',My::Select::time,$seq+1,@more);
}
}
sub delete_host {
my ($host)=@_;
my $h=delete $H{$host} or return;
msg_text("host $host removed from monitor");
}
#-----------------------------------------------------------------------
package My::NetlogReceiver;
our $listen_socket;
our $TCP_MAX=1024;
our $DAY_LAST_MSG;
sub day {
my @f=localtime;
return sprintf "%04d%02d%02d",$f[5]+1900,$f[4]+1,$f[3];
}
sub bigben {
my $day=day();
$day le $DAY_LAST_MSG and return;
warn "NETLOG ==================================================== morning has broken ====\n";
$DAY_LAST_MSG=$day;
}
sub bigben_timer {
bigben();
My::Select::timeout_requeue(30);
}
sub bigben_init {
$DAY_LAST_MSG=day();
My::Select::timeout(30,\&bigben_timer);
}
sub receive {
my ($socket,$peernode,$bufref)=@_;
my $data;
bigben();
defined $socket->recv($data,$TCP_MAX) or return;
# length $data or warn "$peernode: disconnect\n";
length $data or return;
$$bufref.=$data;
while (1) {
last if length($$bufref)<2;
my $l=unpack('n',$$bufref);
# warn "wait for $l+2 bytes got ".length($$bufref)."\n";
last if length($$bufref)<2+$l;
my $msg=substr($$bufref,2,$l);
$$bufref=substr($$bufref,2+$l);
$|=1;
warn "NETLOG $msg\n" unless $msg=~/NETLOG/;
}
My::Select::reader_requeue();
}
sub connect_request {
My::Select::reader_requeue();
my $socket=$listen_socket->accept();
$socket->blocking(0);
my $peernode=$socket->peerhost;
my $buffer='';
My::Select::reader($socket,sub{receive($socket,$peernode,\$buffer)});
# warn "$peernode: connect\n";
}
sub init {
$listen_socket=new IO::Socket::INET(Proto=>'tcp',LocalPort=>1028,Listen=>1,ReuseAddr=>1) or die "$!\n";
My::Select::reader($listen_socket,\&connect_request);
bigben_init();
}
#------------------------------------------------------------------------
package main;
use strict;
use IO::File;
use Sys::Syslog;
use IO::Socket::INET;
use Data::Dumper;
our $UDP_MAX=1472; # for broadcast on alphas
our $UDP_PORT=234;
our $BC_RATE=8; # packets per second broadcast
our $TCP_TIMEOUT=30; # default timeout for tcp processing
our (%options); # RUN OPTIONS
our $donald_s; # My::Select::INET udp socket
our $my_hostname;
our $my_ip; # '141.14.12.12'
$my_hostname=lc `/bin/hostname`;
chomp($my_hostname);
$my_hostname =~ s/\.molgen\.mpg\.de$//;
while (1) {
my $addr=inet_aton($my_hostname);
if(defined $addr) {
$my_ip=inet_ntoa(inet_aton($my_hostname));
}
last if defined $my_ip;
my $once;
unless ($once) {
warn "no IP (yet)= - waiting\n";
$once++;
}
sleep 30;
}
our $my_unixrev;
$my_unixrev=`uname -r`;
chomp($my_unixrev);
our $CLUSTER_PW;
our $CLUSTER_PW_FILE='/etc/clusterd.password';
our $OLD_CLUSTER_PW_FILE='/root/clusterd.password';
our $CLUSTER_PW_TIMESTAMP=0;
$ENV{'PATH'} = '/usr/local/bin:/sbin:/usr/sbin:/bin'.($ENV{PATH}?':'.$ENV{PATH}:''); # for ps , tar (gnu!)
#---------------------------------------------------------- UDP
our ($udp_peer_addr,$udp_peer_port); # ('141.14.12.12',1234)
our %UDP_HANDLER=
(
'filedata' => \&udp_rx_filedata,
'filedata.2' => \&udp_rx_filedata,
'amdtardata' => \&udp_rx_amdtardata,
'loadavg.2' => \&udp_rx_loadavg2,
'restart' => \&udp_rx_restart,
'flush-gidcache' => \&udp_rx_flush_gidcache,
'make-automaps' => \&udp_rx_make_automaps,
'reexport' => \&udp_rx_reexport,
'log' => \&udp_rx_log,
'exec' => \&udp_rx_exec,
'push' => \&udp_rx_push,
);
sub udp_message {
my ($data,$x_udp_peer_addr,$x_udp_peer_port,$donald_s)=@_;
($udp_peer_addr,$udp_peer_port)=($x_udp_peer_addr,$x_udp_peer_port);
defined $CLUSTER_PW or return;
my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return;
$UDP_HANDLER{$handler_name}->(@args) if exists $UDP_HANDLER{$handler_name};
}
sub udp_send_message { # udp_send_message( dst, @args) # dst='141.14.31.255' 'zork' '141.14.16.1' etc. data is anything
my ($ip,@args)=@_;
defined $CLUSTER_PW or return;
$donald_s->send_data($ip,$UDP_PORT,sign($CLUSTER_PW,encode(@args)));
}
#----------------------------------------------------------
sub push_amd_tar {
my ($donald_s)=@_;
my $filename='/tmp/amd.tar';
my $pid=fork;
defined $pid or return warn "$!\n";
unless($pid) {
chdir '/etc/amd' or die "/etc/amd: $!\n";
exec 'tar','cf',$filename,'.';
die "$!\n";
}
wait;
$? and return;
my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n";
my $digest=Digest::MD5->new->addfile($fh)->digest;
warn "tar digest is ",Digest::MD5::md5_hex($digest),"\n";
$pid=fork;
defined $pid or return warn "$!\n";
unless($pid) {
exec 'gzip','-f',$filename;
die "$!\n";
}
wait;
$? and return;
$filename='/tmp/amd.tar.gz';
my $st=Donald::FileInfo->lstat($filename);
defined $st or return warn "$filename: $!\n";
$st->type eq 'F' or return warn "$filename: not a plain file\n";
$fh=new IO::File $filename,'<' or return warn "$filename: $!\n";
my $i=0;
for (my $pos=0;$pos<$st->size;$pos+=1024) {
my $data;
defined $fh->sysread($data,1024) or return warn "$filename: $!\n";
# warn "send bytes $pos to ",$pos+length($data),"\n";
++$i % $BC_RATE or sleep 1;
udp_broadcast_message($donald_s,'amdtardata',$st,$pos,$data,$digest);
}
}
sub push_file {
my ($donald_s,$filename)=@_;
$filename =~ m"^/" or return warn "$filename: please use absolute path\n";
my $st=Donald::FileInfo->lstat($filename);
defined $st or return warn "$filename: $!\n";
my $rpc;
if ($st->type eq 'F') {
$rpc='filedata';
$st->size<=80000 or die "$filename: to big for broadcast (max 80000 bytes)\n";
if ($st->size==0) {
udp_broadcast_message($donald_s,$rpc,$st,0,'');
return;
}
my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n";
my $i=0;
for (my $pos=0;$pos<$st->size;$pos+=1024) {
my $data;
defined $fh->sysread($data,1024) or return warn "$filename: $!\n";
# warn "send bytes $pos to ",$pos+length($data),"\n";
udp_broadcast_message($donald_s,$rpc,$st,$pos,$data);
++$i % $BC_RATE or sleep 1;
}
} elsif ($st->type eq 'L') {
$rpc='filedata.2';
udp_broadcast_message($donald_s,$rpc,$st,0,'');
return;
} else {
die "file type not supported\n";
}
}
our %CMD=(
'mkmotd'=>'/usr/sbin/mkmotd.pl',
);
sub send_exec {
my ($donald_s,$cmd)=@_;
unless (exists $CMD{$cmd}) {
die "available commands: ",join(' , ',keys %CMD),"\n";
}
udp_broadcast_message($donald_s,'exec',$cmd);
}
sub udp_rx_exec {
my ($cmd)=@_;
warn "exec $cmd\n";
exists $CMD{$cmd} or return;
my $pid;
$pid=fork;
unless (defined $pid) {
warn "$!\n";
return;
}
unless ($pid) {
$pid=fork;
defined $pid or exit 1;
$pid and exit;
open STDIN,'<','/dev/null';
open STDOUT,'>','/dev/null';
open STDERR,'>','/dev/null';
alarm(60);
chdir '/';
exec '/bin/sh','-c',$CMD{$cmd};
exit 1;
}
wait;
}
#-------------------------------------------------------------
sub normalize_seg { # [pos,len],[pos,len],...
my @s=sort {$a->[0] <=> $b->[0]} @_;
my $i=0;
while ($i<$#s) {
# is element $i joinable with next element
my $end_0=$s[$i]->[0]+$s[$i]->[1];
if ($end_0 >= $s[$i+1]->[0] ) {
my $end_1=$s[$i+1]->[0]+$s[$i+1]->[1];
$s[$i]->[1] = ($end_0>$end_1 ? $end_0 : $end_1)-$s[$i]->[0];
splice @s,$i+1,1;
} else {
$i++;
}
}
return @s;
}
my %RECEIVER; # ( filename => $receiver, .... )
# $receiver : [ st_want , last_rx , io_handle , [ [pos,len] , [pos,len] , ... ] ]
sub purge_old_receiver {
while (my ($n,$v)=each %RECEIVER) {
if ($v->[1]+10<My::Select::time) {
warn 'timeout receiving ',$v->[0]->name,"\n";
log_to_stat_target('timeout receiving ',$v->[0]->name);
delete $RECEIVER{$n};
}
}
My::Select::timeout_requeue(60);
}
#-------------------------------------------------------------
our $INSTALLED_DIGEST='';
our $rx_filedata_done;
sub udp_rx_amdtardata {
my ($st_want,$pos,$data,$digest)=@_;
ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo';
### $digest eq $INSTALLED_DIGEST and $pos==0 and warn "/etc/amd - ",Digest::MD5::md5_hex($digest)," already installed\n";
$digest eq $INSTALLED_DIGEST and return;
#### $pos==0 and warn "receiving /etc/amd - ",Digest::MD5::md5_hex($digest),"\n";
udp_rx_filedata($st_want,$pos,$data);
if ($rx_filedata_done) {
my $pid=fork;
defined $pid or return warn "$!\n";
unless($pid) {
chdir '/etc/amd' or die "/etc/amd: $!\n";
exec 'tar','xzf',$st_want->name;
die "$!\n";
}
}
wait;
$? and return;
warn "installed /etc/amd - ",Digest::MD5::md5_hex($digest),"\n";
$INSTALLED_DIGEST=$digest;
system '/sbin/make-automaps';
}
our ($machine,$SYS_lchown,$SYS_mknod,$lmtime_sub);
our ($SYS_utimensat,$AT_FDCWD,$UTIME_OMIT,$AT_SYMLINK_NOFOLLOW);
sub lmtime_unsupported {
my ($path,$mtime)=@_;
warn "$path: don't known how to change symlink mtime on target architecture\n";
}
sub lmtime_utimensat {
my ($path,$mtime)=@_;
my $tsa=pack 'qqqq',0,$UTIME_OMIT,$mtime,0;
syscall($SYS_utimensat,$AT_FDCWD,$path,$tsa,$AT_SYMLINK_NOFOLLOW)==0 or return warn "$path: failed to lmtime: $!\n";
}
$lmtime_sub=\&lmtime_unsupported;
chomp($machine=`uname -m`);
if ($machine eq 'i686') {
$SYS_lchown=198; # __NR_lchown32 in /usr/include/asm/unistd.h
$SYS_mknod=14; # __NR_mknod
} elsif ($machine eq 'x86_64') {
$SYS_lchown=94; # __NR_lchown in /usr/include/asm-x86_64/unistd.h
$SYS_mknod=133; # __NR_mknod
$SYS_utimensat=280; # /usr/include/asm/unistd_64.h
$AT_FDCWD=-100; # /usr/include/fcntl.h
$UTIME_OMIT=(1<<30)-2; # /usr/include/bits/stat.h
$AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/fcntl.h
$lmtime_sub=\&lmtime_utimensat;
} elsif ($machine eq 'alpha') {
$SYS_lchown=208; # SYS_lchown in /usr/include/syscall.h
$SYS_mknod=14; # SYS_mknod
} elsif ($machine eq 'amd64') {
$SYS_lchown=254; # SYS_lchown in /usr/include/syscall.h
$SYS_mknod=14; # SYS_mknod
} elsif ($machine eq 'ppc64le') {
$SYS_lchown=16; # __NR_lchown in /usr/include/powerpc64le-linux-gnu/asm/unistd.h
$SYS_mknod=14; # __NR_mknod
$SYS_utimensat=304; # __NR_utimensat in /usr/include/powerpc64le-linux-gnu/asm/unistd.h
$AT_FDCWD=-100; # /usr/include/linux/fcntl.h
$UTIME_OMIT=(1<<30)-2; # /usr/include/powerpc64le-linux-gnu/bits/stat.h
$AT_SYMLINK_NOFOLLOW=0x100; # /usr/include/linux/fcntl.h
$lmtime_sub=\&lmtime_utimensat;
} else {
warn "unknown machine type $machine: symlink ownership can't be set.\n";
warn "unknown machine type $machine: named pipes,character and block devices can't be created\n";
}
sub lchown {
my ($uid,$gid,$path)=@_;
$SYS_lchown or return;
syscall($SYS_lchown,$path,$uid+0,$gid+0)==0 or return warn "$path: failed to lchown: $!\n";
}
sub lmtime {
my ($mtime,$path)=@_;
$lmtime_sub or return;
$lmtime_sub->($path,$mtime);
}
sub udp_rx_filedata {
# set rx_filedata_done as a side effect
my ($st_want,$pos,$data)=@_;
ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo';
my $filename=$st_want->name;
my $tmp_filename="$filename.tmp";
$rx_filedata_done=0;
my $st_is=Donald::FileInfo->lstat($st_want->name);
if ($st_is && $st_is->type eq 'F' && $st_is->size==$st_want->size && $st_is->mtime==$st_want->mtime) {
#### $pos==0 and warn " $filename seems to be current\n";
return;
}
if ($st_want->type eq 'L') {
if (!$st_is || $st_is->type ne 'L' || $st_is->target ne $st_want->target) {
$st_is and (unlink($filename) or return warn "$filename: failed to unlink: $!\n");
symlink($st_want->target,$filename) or return warn "$filename: failed to create symlink: $!\n";
lchown($st_want->uid,$st_want->gid,$filename);
lmtime($st_want->mtime,$filename);
warn "installed $filename -> ".$st_want->target."\n";
} else {
if ($st_is->uid != $st_want->uid || $st_is->gid != $st_want->gid) {
lchown($st_want->uid,$st_want->gid,$filename);
}
if ($st_is->mtime != $st_want->mtime) {
lmtime($st_want->mtime,$filename);
}
}
return;
}
if (length($data) == $st_want->size) {
# complete file in one broadcast
my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0);
defined $fh or return warn "$tmp_filename: $!\n";
$fh->syswrite($data) or return warn "$tmp_filename: $!\n";
$fh->close or return warn "$tmp_filename: $!\n";
chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n";
chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n";
$st_is and unlink($filename);
rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n";
utime($st_want->mtime,$st_want->mtime,$filename);
warn "installed $filename\n";
$rx_filedata_done=1;
return;
}
length($data) or return; # shouldn't happen.
my $receiver=$RECEIVER{$st_want->name};
if (defined $receiver) {
if ( $receiver->[0]->size != $st_want->size or $receiver->[0]->mtime != $st_want->mtime ) {
$receiver=undef;
}
}
unless (defined $receiver) {
# create new receiver
## warn "start receiving $filename from $udp_peer_addr\n";
-e $tmp_filename and unlink($tmp_filename);
my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0);
defined $fh or return warn "$tmp_filename: $!\n";
$receiver = [$st_want,My::Select::time,$fh,[]];
$RECEIVER{$filename}=$receiver;
}
{
# warn "$filename: receive $pos length ",length($data),"\n";
# write data ( size cant be 0 here )
$receiver->[2]->seek($pos,0) or return warn "$tmp_filename: $!\n";
$receiver->[2]->syswrite($data) or return warn "$tmp_filename: $!\n";;
$receiver->[1]=My::Select::time;
my $s=$receiver->[3]=[normalize_seg(@{$receiver->[3]},[$pos,length($data)])];
#warn "$filename: receive $pos length ",length($data)," segments now: ",join(",",map( {'['.$_->[0].','.$_->[1].']'} @{$receiver->[3]})),"\n";
# all there ?
if (@$s == 1 && $s->[0]->[0]==0 && $s->[0]->[1]==$st_want->size) {
$receiver->[2]->close;
chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n";
chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n";
$st_is and unlink($filename);
rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n";
utime($st_want->mtime,$st_want->mtime,$filename);
warn "installed $filename\n";
delete $RECEIVER{$filename};
$rx_filedata_done=1;
}
}
}
#----------------------------------------------------------- sample running proc every 10 seconds
our @proc_running=(0)x12; # 12 samples every 5 seconds -> minute average
our @proc_running_10=(0)x10; # 10 samples every minute -> 10 minute average
our $SAMPLE_TICK=0;
sub xget_cur_running_proc {
open L,'<','/proc/stat' or return 0;
while (<L>) {
# warn $_;
return $1 if /^procs_running\s*(\d+)/;
}
return 0;
}
sub get_cur_running_proc {
our $CPUS;
my $r=xget_cur_running_proc();
if ($CPUS && $r>$CPUS+1) {
pload_debug("more than $CPUS+1 proc");
}
return $r;
}
sub running_proc {
my $ret=0;
$ret+=$_ for @proc_running;
return $ret/@proc_running;
}
sub running_proc_10 {
my $ret=0;
$ret+=$_ for @proc_running_10;
return $ret/@proc_running_10;
}
sub sample_rproc { # every 5 seconds
@proc_running=(@proc_running[1..@proc_running-1],get_cur_running_proc()-1);
if ($SAMPLE_TICK<12) {
$SAMPLE_TICK++;
} else {
@proc_running_10=(@proc_running_10[1..@proc_running_10-1],running_proc());
$SAMPLE_TICK=0;
}
My::Select::timeout_requeue(5);
}
#----------------------------------------------------------- stat
our ($CPUS,$BOGOMIPS);
sub init_cpuinfo {
Donald::Tools::is_alpha and return;
open L,'<','/proc/cpuinfo' or return;
while (<L>) {
if (/^bogomips\s*:\s*([\d.]+)/) {$CPUS++;$BOGOMIPS+=$1;}
}
}
sub loadavg { # AXP : (system load average) , LINUX: (system load average, pload, freebogo)
if (Donald::Tools::is_alpha) {
my $data=pack 'l!3lx4l!3';
my $i=syscall(85,3,0,$data,1,length($data)); # table(id=TBL_LOADAVG,index=0,addr=data,nel=1,lel=56)
my ($l0,$l1,$l2,$scale,$mach0,$mach1,$mach2)=unpack 'l!3lx4l!3',$data;
$scale or return undef;
return $l0/$scale;
} else {
$CPUS or init_cpuinfo();
my $running_proc=running_proc();
open L,'<','/proc/loadavg' or return undef;
my $data;
sysread(L,$data,1024);
close L;
$data =~ /^(\d+\.?\d*)/ or return undef; # 5 min loadavg
if ($CPUS) {
return ($1+0,($running_proc)/$CPUS,($running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS));
} else {
return $1+0;
}
}
}
our $STAT_TARGET='afk';
our $STAT_SEQ=0;
sub send_stat {
My::Select::timeout_requeue(600);
my ($load_avg,$pload,$pcapacity)=loadavg();
defined $load_avg or return;
udp_send_message($STAT_TARGET,'loadavg.2',$my_hostname,$STAT_SEQ++,$load_avg,version_info(),$pload,$pcapacity,$my_unixrev);
}
sub udp_rx_loadavg2 {
my ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev)=@_;
$my_hostname eq $STAT_TARGET and My::Cluster::Updown::rx_hostannounce($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev)
}
sub udp_rx_log {
my ($msg)=@_;
My::Cluster::Updown::msg_text($msg);
}
sub log_to_stat_target {
my ($msg)=join '',@_;
udp_send_message($STAT_TARGET,'log',"$my_hostname: $msg");
}
# ----------------------------------------------------------
sub udp_rx_restart {
# double-fork, because kill_previous_server() won't kill its parent
my $pid=fork;
if (defined $pid && $pid==0) {
my $pid2=fork;
if (defined $pid2 && $pid==0) {
exec '/usr/sbin/clusterd','--kill','--daemon';
die "exec failed: $!\n";
}
}
}
sub udp_rx_flush_gidcache {
if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') {
print $out time();
} else {
warn "proc/net/rpc/auth.unix.gid/flush: $!\n";
}
}
sub udp_rx_make_automaps {
if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') {
print $out time();
} else {
warn "proc/net/rpc/auth.unix.gid/flush: $!\n";
}
system '/sbin/make-automaps';
}
sub udp_rx_reexport {
system '/usr/bin/mxmount --reexport-only';
}
#----------- tcp mgmt console -----------------------------
our $MGMT_PORT=234;
our $mgmt_listen_socket;
our %mgmt_sockets;
sub mgmt_init {
$mgmt_listen_socket=new IO::Socket::INET(LocalPort=>$MGMT_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1);
defined $mgmt_listen_socket or die "$!\n";
My::Select::reader($mgmt_listen_socket,\&mgmt_connect_request);
}
sub mgmt_connect_request {
My::Select::reader_requeue();
# listen socket ready
my $socket=$mgmt_listen_socket->accept();
$socket->blocking(0);
my $peernode=$socket->peerhost;
### warn "accepted mgmt connection from $peernode\n";
My::Select::reader($socket,sub{mgmt_receive($socket)});
$mgmt_sockets{$socket}=$socket;
$socket->print("clusterd ".version_info()." stupid console\n");
$socket->print("For historical messages, grep \"clusterd\" from /var/log/messages on $STAT_TARGET (or \"tail -f /var/log/messages |grep cluster\")\n");
}
sub mgmt_receive {
my ($s)=@_;
my $data;
my $len=$s->sysread($data,1024);
if (!defined $len or $len==0 or $len==1 && $data eq "\cD") {
delete $mgmt_sockets{$s};
$s->close;
###warn "lost connection to mgmt port\n";
return;
}
My::Select::reader_requeue();
$data=~s/\r?\n$//;
length $data or return;
if ($data eq 'l') {
$my_hostname eq $STAT_TARGET and My::Cluster::Updown::status($s);
$s->print("AREA: ",area_config_as_string(),"\n");
$s->print("STAT TARGET: ",$STAT_TARGET,"\n");
$s->print(' (',scalar(localtime),')',"\n");
}
elsif ($data eq 'r') {
for my $host (sort keys %H) {
if ($H{$host}->[0] eq 'UP') {
$s->printf("%-40s : %s\n",$host,$H{$host}->[7]);
}
}
} elsif ($data eq 'v') {
$Data::Dumper::Terse=1;
$Data::Dumper::Indent=0;
for (sort keys %H) {
$s->printf("%15s : %s\n",$_,Dumper($H{$_}));
}
} elsif ($data eq 'd') {
my $running_proc=running_proc();
my $running_proc_10=running_proc_10();
$s->printf("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc);
$s->printf("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10);
$CPUS or init_cpuinfo;
$CPUS and $s->printf("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS);
$CPUS and $s->printf("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS);
} elsif ($data =~ /^delete (\S+)$/) {
My::Cluster::Updown::delete_host($1);
} elsif ($data eq 'dup show') {
for my $h (sort {$DUPLICATES{$b} <=> $DUPLICATES{$a} } keys %DUPLICATES) {
$s->printf("%-10s : %5d\n",$h,$DUPLICATES{$h});
}
} elsif ($data eq 'dup clear') {
%DUPLICATES=();
} else {
$s->print(<<"_EOF_");
unknown command: $data
l : list status
v : dump status array
d : debug (cpu speed calc)
r : dump unix revisions
delete HOST : forget about HOST
dup show : show duplicates stat
dup clear : clear duplicates stat
to exit use ^D
_EOF_
}
}
sub mgmt_print_all {
my ($msg)=@_;
$options{'foreground'} and not $options{'syslog'} and print $msg;
for my $s (values(%mgmt_sockets)) {
$s->print($msg);
}
}
#-----------------------------------------------------------
sub clp_rx_LSOF {
my ($socket,$pattern)=@_;
my $pid=fork;
unless (defined $pid) {
warn"$!\n";
return;
}
unless ($pid) {
my $pid=fork;
defined $pid or die "$!\n";
unless ($pid) {
$socket->blocking(1);
# -n inhibits the conversion of network numbers to host names for network files.
# -b causes lsof to avoid kernel functions that might block - lstat(2), readlink(2), and stat(2).
# -w disables warning messages.
open P,'timeout -k 92s 90s lsof -n -b -w|' or die "$!\n";
while (<P>) {
next if defined $pattern && index($_,$pattern)<0;
$socket->send(pack('n',length($_)).$_,0);
}
close P;
if ($?) {
$_=sprintf("** lsof timout/error on %s\n",$my_hostname);
$socket->send(pack('n',length($_)).$_,0);
}
close $socket;
exit;
}
exit;
}
close $socket;
wait;
return 1;
}
sub run_cmd {
my ($socket,@cmd)=@_;
my $pid=fork;
unless (defined $pid) {
warn"$!\n";
return;
}
unless ($pid) {
my $opipe=new IO::Pipe;
my $epipe=new IO::Pipe;
my $cpid=fork;
defined $cpid or die "$!\n";
unless ($cpid) {
warn "exec ".join(' ',@cmd)."\n";
$opipe->writer();
$epipe->writer();
open STDIN,'<','/dev/null';
open STDOUT,'>&',$opipe;
open STDERR,'>&',$epipe;
exec @cmd;
die "$!\n";
}
# $::SIG{'CHLD'}=sub {
# my $pid=wait;
# my $buffer="X$?";
# $socket->send(pack('n',length($buffer)).$buffer,0);
# exit;
# };
$opipe->reader();
$epipe->reader();
my $ofn=$opipe->fileno;
my $efn=$epipe->fileno;
my ($rvec_in,$wvec_in,$evec_in)=('','','');
vec($rvec_in,$ofn,1)=1;
vec($rvec_in,$efn,1)=1;
my $channel=2;
my $buffer;
$socket->blocking(1);
while (1) {
my ($rvec,$wvec,$evec)=($rvec_in,$wvec_in,$evec_in);
my $ready=select($rvec,$wvec,$evec,60);
$ready or die "timeout\n";
if (vec($rvec,$ofn,1)) {
my $len=$opipe->sysread($buffer,1024-1-2);
defined $len or die "$!\n";
if ($len) {
$socket->send(pack('n',$len+1)."O$buffer",0);
} else {
vec($rvec_in,$ofn,1)=0;
$opipe->close;
$channel--;
}
}
if (vec($rvec,$efn,1)) {
my $len=$epipe->sysread($buffer,1024-1-2);
defined $len or die "$!\n";
if ($len) {
$socket->send(pack('n',$len+1)."E$buffer",0);
} else {
vec($rvec_in,$efn,1)=0;
$epipe->close;
$channel--;
}
}
if ($channel==0) {
my $pid=wait;
my $buffer="X$?";
$socket->send(pack('n',length($buffer)).$buffer,0);
exit;
}
}
}
}
#----------- CLP cluster protocol -----------------------------
our $CLP_PORT=235;
our $clp_listen_socket;
our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF,'PULL'=>\&clp_rx_PULL);
sub clp_init {
$clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>128,ReuseAddr=>1);
defined $clp_listen_socket or die "$!\n";
My::Select::reader($clp_listen_socket,\&clp_connect_request);
}
sub clp_connect_request {
My::Select::reader_requeue();
# listen socket ready
my $socket=$clp_listen_socket->accept();
$socket->blocking(0);
my $peernode=$socket->peerhost;
my $buffer='';
My::Select::reader($socket,sub{clp_receive($socket,\$buffer)});
}
sub clp_receive {
my ($s,$bufref)=@_;
my $data;
defined $s->recv($data,$TCP_MAX) or return;
if (!length($data) ) {
$s->close;
return;
}
$$bufref.=$data;
while (1) {
last if length($$bufref)<2;
my $l=unpack('n',$$bufref);
last if length($$bufref)<2+$l;
my $msg=substr($$bufref,2,$l);
$$bufref=substr($$bufref,2+$l);
clp_message($s,$msg) and return;
}
My::Select::reader_requeue();
}
sub clp_message {
my ($socket,$data)=@_;
defined $CLUSTER_PW or return;
my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return;
$CLP_HANDLER{$handler_name}->($socket,@args) if exists $CLP_HANDLER{$handler_name};
}
sub clp_send_message { # clp_send_message($socket, @args)
my ($s,@args)=@_;
defined $CLUSTER_PW or return;
my $data=sign($CLUSTER_PW,encode(@args));
unless ($s->peername) {
return 0;
}
$s->send(pack('n',length($data)).$data);
}
sub clp_rx_CMD {
my ($socket,@args)=@_;
run_cmd($socket,@args);
close $socket;
return 1;
}
# send_tcp_cp($socket,$cb,$timeout,@args)
#
# send a cluster protocoll message over an async tcp socket.
#
# assume $CLUSTER_PW is valid
#
sub send_tcp_cp {
my ($s,$cb,$timeout,@args)=@_;
my $data=sign($CLUSTER_PW,encode(@args));
My::Select::INET::send_tcp($s,pack('n',length($data)).$data,$timeout,$cb);
}
#----------------------------------------------------------
sub sync_cluster_pw {
my $st=Donald::FileInfo->lstat($CLUSTER_PW_FILE);
# upgrade : move cluster password file from /root to /etc
if (!$st && -e $OLD_CLUSTER_PW_FILE) {
warn "upgrading cluster password file location $OLD_CLUSTER_PW_FILE -> $CLUSTER_PW_FILE\n";
my $in=new IO::File $OLD_CLUSTER_PW_FILE,'<';
unless (defined $in) {warn "$OLD_CLUSTER_PW_FILE: $!\n";return undef;}
my $out=new IO::File $CLUSTER_PW_FILE,O_WRONLY|O_CREAT,0600;
unless (defined $out) {warn "$CLUSTER_PW_FILE: $!\n";return undef;}
my $data;
$in->read($data,1024);
$out->write($data);
$in->close;
$out->close;
$st=Donald::FileInfo->lstat($CLUSTER_PW_FILE);
defined $st or die "$CLUSTER_PW_FILE: $!\n";
unlink $OLD_CLUSTER_PW_FILE;
}
if ($st) {
if (!defined $CLUSTER_PW or $CLUSTER_PW_TIMESTAMP != $st->mtime) {
my $fh=new IO::File $CLUSTER_PW_FILE,'<';
if (defined ($fh)) {
defined $CLUSTER_PW and warn "update cluster password\n";
$fh->read($CLUSTER_PW,1024);
$CLUSTER_PW_TIMESTAMP=$st->mtime;
} else {
defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n";
$CLUSTER_PW=undef;
}
}
} else {
defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n";
$CLUSTER_PW=undef;
}
My::Select::timeout(60,\&sync_cluster_pw);
return defined $CLUSTER_PW;
}
#------------------------------------------------------------
# area routing
our %AREA_ROUTER=
(
wtf => '141.14.31.255',
);
our $area_socket;
sub init_area {
exists $AREA_ROUTER{$my_hostname} or return;
warn "I am area router for $AREA_ROUTER{$my_hostname}\n";
$area_socket=new IO::Socket::INET (Proto=>'udp',LocalPort=>$UDP_PORT+1) or die "$!\n";
My::Select::reader($area_socket,sub{area_message($area_socket)});
}
sub area_message {
my ($area_socket)=@_;
my $data;
my $peer = $area_socket->recv($data,$UDP_MAX);
my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer);
my $udp_peer_addr=inet_ntoa($peer_iaddr);
$donald_s->send_data($AREA_ROUTER{$my_hostname},$UDP_PORT,$data); # broadcast to our network
My::Select::reader_requeue();
}
sub udp_broadcast_message {
my ($donald_s,@args)=@_;
defined $CLUSTER_PW or return;
my $data=sign($CLUSTER_PW,encode(@args));
for my $ip (keys %AREA_ROUTER) {
$donald_s->send_data($ip,$UDP_PORT+1,$data);
}
}
sub area_config_as_string {
return join(',',map({"$_ => $AREA_ROUTER{$_}"} keys %AREA_ROUTER))
}
#------------------------------------------------------------
our $PROG_FILE; # saved $0 - may be relative path
our $PROG_MTIME;
sub check_progfile_status {
defined $PROG_FILE or $PROG_FILE=$0;
my @f=lstat $PROG_FILE or return;
if (defined $PROG_MTIME) {
if ($f[9] != $PROG_MTIME) {
warn "progfile $PROG_FILE has changed - upgrade restart from version ".version_info()."\n";
exec $PROG_FILE,'--daemon',($options{'foreground'}?'--foreground':()),($options{'syslog'}?'--syslog':());
}
}
else {
$PROG_MTIME=$f[9];
}
My::Select::timeout(60,\&check_progfile_status);
}
sub version_info { # '20090617-155314'
my $t;
if (defined $PROG_MTIME) {
my @f=localtime($PROG_MTIME);
return sprintf '%4d%02d%02d-%02d%02d%02d',$f[5]+1900,$f[4]+1,$f[3],$f[2],$f[1],$f[0];
} else {
return '?';
}
}
#------------------------------------------------------------
sub pload_debug {
my ($why)=@_;
open T,'>','/var/log/pload_debug.txt' or return;
print T "$why at ".scalar(localtime),"\n";
my $running_proc=running_proc();
my $running_proc_10=running_proc_10();
printf T ("RPROC : %s : %.2f\n",join(',',@proc_running),$running_proc);
printf T ("RPROC_10 : %s : %.2f\n",join(',',map({sprintf '%.2f',$_} @proc_running_10)),$running_proc_10);
$CPUS or init_cpuinfo;
$CPUS and printf T ("run: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc, $CPUS,$BOGOMIPS,$running_proc/$CPUS, $running_proc>=$CPUS?0:($CPUS-$running_proc)*$BOGOMIPS);
$CPUS and printf T ("run10: %.2f , CPUs: %d , bogo %.2f , load %.2f , capacity %.1f\n",$running_proc_10,$CPUS,$BOGOMIPS,$running_proc_10/$CPUS,$running_proc_10>=$CPUS?0:($CPUS-$running_proc_10)*$BOGOMIPS);
system "uptime>>/var/log/pload_debug.txt";
system "ps -AlfT>>/var/log/pload_debug.txt";
system "cat /proc/stat>>/var/log/pload_debug.txt";
system "cat /proc/swaps>>/var/log/pload_debug.txt";
close T;
}
sub check_overload() {
My::Select::timeout_requeue(600);
if ($CPUS) {
my $running_proc_10=running_proc_10();
my $pload_10=running_proc_10()/$CPUS;
$pload_10>1.5 and warn (sprintf("pload>150%% (%3.1f%%)\n",$pload_10*100));
}
}
#####################################################
my $slave=0;
my $exit_value=0;
sub expand_hostconfig_hosts {
my ($filter)=@_;
open my $p,'-|','hostconfig','--list',$filter or die "hostconfig: $!\”n";
return split ' ',readline($p);
}
sub exec_at {
my ($host,@cmd)=@_;
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
my $s=new IO::Socket::INET(PeerAddr=>$host,PeerPort=>$CLP_PORT);
unless (defined $s) {
die "$host: $!\n";
}
clp_send_message($s,'CMD',@cmd);
my $pbuffer='';
my $olbuffer='';
my $elbuffer='';
My::Select::reader($s,sub{cmd_rx($host,$s,\$pbuffer,\$olbuffer,\$elbuffer)});
$slave=1;
My::Select::run() if $slave;;
}
sub lsof {
my ($pattern)=@_;
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
for my $host (sort(expand_hostconfig_hosts('amd'))) {
next if $host eq 'tux';
my $s=new IO::Socket::INET (PeerAddr=>$host,PeerPort=>$CLP_PORT);
unless (defined $s) {
warn "$host: $!\n";
next;
}
clp_send_message($s,'LSOF',$pattern);
my $pbuffer='';
my $olbuffer='';
My::Select::reader($s,sub{lsof_rx($host,$s,\$pbuffer,\$olbuffer)});
$slave++;
}
My::Select::run() if $slave;;
}
sub lsof_rx {
my ($host,$s,$pbufref,$olbufref)=@_;
my $data;
defined $s->recv($data,$TCP_MAX*2) or return;
if (!length($data)) {
close $s;
--$slave;
exit $exit_value unless $slave;
return;
}
$$pbufref.=$data;
while (1) {
last if length($$pbufref)<2;
my $l=unpack('n',$$pbufref);
last if length($$pbufref)<2+$l;
my $msg=substr($$pbufref,2,$l);
$$pbufref=substr($$pbufref,2+$l);
lsof_msg($host,$s,$msg,$olbufref);
}
My::Select::reader_requeue();
}
sub lsof_msg {
my ($host,$s,$data,$olbufref)=@_;
my $msg=$$olbufref.$data;
for ($msg=~/([^\n]*\n)/gs) {
print "$host $_";
}
($$olbufref)=$msg=~/([^\n]*)\z/;
}
sub cmd_rx {
my ($host,$s,$pbufref,$olbufref,$elbufref)=@_;
my $data;
defined $s->recv($data,$TCP_MAX*2) or return;
if (!length($data)) {
close $s;
--$slave;
exit $exit_value unless $slave;
return;
}
$$pbufref.=$data;
while (1) {
last if length($$pbufref)<2;
my $l=unpack('n',$$pbufref);
last if length($$pbufref)<2+$l;
my $msg=substr($$pbufref,2,$l);
$$pbufref=substr($$pbufref,2+$l);
cmd_msg($host,$s,$msg,$olbufref,$elbufref);
}
My::Select::reader_requeue();
}
sub cmd_msg {
my ($host,$s,$data,$olbufref,$elbufref)=@_;
my $channel=substr($data,0,1);
if ($channel eq 'X') {
my $rdata=substr($data,1);
$rdata!=0 and $exit_value=1;
return;
} elsif ($channel eq 'O') {
my $msg=$$olbufref.substr($data,1);
for ($msg=~/([^\n]*\n)/gs) {
print "$host: $_";
}
($$olbufref)=$msg=~/([^\n]*)\z/;
} elsif ($channel eq 'E') {
my $msg=$$elbufref.substr($data,1);
for ($msg=~/([^\n]*\n)/gs) {
print STDERR "$host: $_";
}
($$elbufref)=$msg=~/([^\n]*)\z/;
}
}
our $SYS_SENDFILE=40; # /usr/include/asm/unistd_64.h
sub sendfile {
my ($out_fd,$in_fd,$offset,$count)=@_;
my $ret=syscall($SYS_SENDFILE,fileno($out_fd),fileno($in_fd),$offset,$count);
return $ret<0 ? undef : $ret;
}
sub clp_rx_PULL {
my ($s,$st_want)=@_;
my $st_is=Donald::FileInfo->lstat($st_want->name);
if (!defined $st_is or $st_is->type ne 'F' or $st_is->size != $st_want->size or $st_is->mtime != $st_want->mtime) {
warn $st_want->name." requested by ".$s->peerhost.": no longer available\n";
return;
}
my $fh;
unless (open $fh,'<',$st_want->name) {
warn $st_want->name.": $!\n";
return;
}
my $bytes=$st_is->size;
my $cb_tmo=sub { My::Select::cancel_handle($s); };
my $cb_write=sub {
my $l=sendfile($s,$fh,0,$bytes);
unless (defined $l) {
warn "$!";
return;
}
$bytes-=$l;
if ($bytes) {
My::Select::writer_requeue if $bytes;
} else {
close($s);
}
};
My::Select::timeout(5,$cb_tmo);
My::Select::writer($s,$cb_write);
}
sub udp_rx_push {
my ($ip,$st_want)=@_;
my $filename=$st_want->name;
my $tmp_filename="$filename.tmp";
$ip eq $my_ip and return;
my $st_is=Donald::FileInfo->lstat($st_want->name);
unless ($st_want->type eq 'F') {
warn "$filename: type ".$st_want->type." not yet implemented\n";
return;
}
if ($st_is
&& $st_is->type eq 'F'
&& $st_is->size == $st_want->size
&& $st_is->mtime == $st_want->mtime
&& $st_is->uid == $st_want->uid
&& $st_is->gid == $st_want->gid
&& $st_is->perm == $st_want->perm
) {
warn "$filename: already okay\n";
return;
}
if ($st_want->size==0) {
my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0);
defined $fh or return warn "$tmp_filename: $!\n";
$fh->close or return warn "$tmp_filename: $!\n";
chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n";
chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n";
rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n";;
utime($st_want->mtime,$st_want->mtime,$filename);
warn "installed (empty) $filename\n";
return;
}
my $s;
$s=My::Select::INET::connect_tcp($ip,$CLP_PORT,$TCP_TIMEOUT,sub {
$! and return warn "$ip: $!\n";
send_tcp_cp($s,sub {
$! and return warn "$ip: $!\n";
my $fh = IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0);
defined $fh or return warn "$tmp_filename: $!\n";
my $cb;
my $bytes=$st_want->size;
$cb=sub {
# note, we need to break the circular references $cb of our caller, if no longer needed
my ($buf)=@_;
if ($!) { warn "$ip: $!\n";$cb=undef;return; }
if (length($buf)==0) { warn "$ip: EOF\n";$cb=undef;return;}
print $fh $buf or return warn "$tmp_filename: $!\n";
$bytes-=length($buf);
if ($bytes>0) {
My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT);
return;
}
$cb=undef;
close $fh or return warn "$tmp_filename: $!\n";
chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n";
chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n";
rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n";
utime($st_want->mtime,$st_want->mtime,$filename);
warn "installed $filename\n";
};
My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT);
},$TCP_TIMEOUT,'PULL',$st_want);
});
}
sub cmd_push {
my @files=@_;
for my $filename (@files) {
$filename =~ m"^/" or return warn "$filename: please use absolute path\n";
-e $filename or die "$filename: no such file\n";
}
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n";
for my $filename (@files) {
my $st=Donald::FileInfo->lstat($filename);
defined $st or die "$filename: $!\n";
$st->type eq 'F' or die "$filename: only plain files currently supported\n";
open my $test,'<',$filename or die "$filename: $!\n";
udp_broadcast_message($donald_s,'push',$my_ip,$st);
}
}
#------------------------------------------------------------
our $TRUSTCHECK_PORT=236;
our $trustcheck_listen_socket;
sub trustcheck_init {
$trustcheck_listen_socket=new IO::Socket::INET(LocalPort=>$TRUSTCHECK_PORT,Proto=>'tcp',Listen=>10,ReuseAddr=>1);
defined $trustcheck_listen_socket or die "$!\n";
My::Select::reader($trustcheck_listen_socket,\&trustcheck_connect_request);
}
sub trustcheck_connect_request {
My::Select::reader_requeue();
my $socket=$trustcheck_listen_socket->accept();
$socket->blocking(0);
my $hostname = gethostbyaddr(inet_aton($socket->peerhost()), AF_INET);
system 'hostconfig','--host',$hostname,'amd';
if ($? == 0) {
$socket->send("I trust you\n", 0);
} elsif ($? == 256) {
$socket->send("I don't trust you\n", 0);
}
close($socket);
}
#------------------------------------------------------------
use constant USAGE => <<'__EOF__';
usage: $0 [options]
--push file # broadcast this file
--push-amd-tar # broadcast /etc/amd
--send-restart # broadcast a restart request to all nodes
--exec mkmotd # execute /usr/sbin/mkmotd.pl on all nodes
--exec @node cmd [args...] # execute cmd on node
--flush-gidcache # flush rpc auth.unix.gid cache on all nodes
--make-automaps # execute /usr/sbin/make-automaps on all nodes
--reexport # execute /usr/bin/mxmount --reexport-only on all nodes
--lsof=pattern
--kill # try to kill a running server
--daemon # start a daemon
--kill # try to kill previous server first
--foreground # stay in foreground, log to stderr
--syslog # log to syslog instead of stderr
push files.... # push files over tcp
__EOF__
use Getopt::Long;
GetOptions
(
'kill' => \$options{'kill'},
'daemon' => \$options{'daemon'},
'push=s' => \$options{'push'},
'exec=s' => \$options{'exec'},
'foreground' => \$options{'foreground'},
'syslog' => \$options{'syslog'},
'push-amd-tar' => \$options{'push_amd_tar'},
'send-restart' => \$options{'send-restart'},
'flush-gidcache' => \$options{'flush-gidcache'},
'make-automaps' => \$options{'make-automaps'},
'reexport' => \$options{'reexport'},
'lsof=s' => \$options{'lsof'},
) or die USAGE;
if (defined $options{'push'}) {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n";
push_file($donald_s,$options{'push'});
} elsif (defined $options{'exec'}) {
if (substr($options{'exec'},0,1) eq '@') {
length(length($options{'exec'})>1) or die USAGE;
@ARGV>=1 or die USAGE;
exec_at(substr($options{'exec'},1),@ARGV);
} else {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n";
send_exec($donald_s,$options{'exec'});
}
} elsif (defined $options{'push_amd_tar'}) {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n";
push_amd_tar($donald_s);
} elsif (defined $options{'send-restart'}) {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n";
udp_broadcast_message($donald_s,'restart');
} elsif (defined $options{'flush-gidcache'}) {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n";
udp_broadcast_message($donald_s,'flush-gidcache');
} elsif (defined $options{'make-automaps'}) {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n";
udp_broadcast_message($donald_s,'make-automaps');
} elsif (defined $options{'reexport'}) {
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n";
udp_broadcast_message($donald_s,'reexport');
} elsif (defined $options{'daemon'}) {
$options{'kill'} and Donald::Tools::kill_previous_server('clusterd') and sleep 2;
$SIG{PIPE}='IGNORE';
$donald_s=new My::Select::INET(Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT) or die "$!\n";
$donald_s->receive_data(\&udp_message,$donald_s);
unless ($options{'foreground'}) {
my $pid=fork;
defined $pid or die "$!\n";
$pid and exit 0;
}
if ($options{'syslog'} or not $options{'foreground'}) {
openlog('clusterd','pid','daemon');
Sys::Syslog::setlogsock('unix'); # with 'native' we get EOLs in the logfile, option "noeol" doesn't work
$SIG{__WARN__} = sub { syslog('warning',@_); };
$SIG{__DIE__} = sub { syslog('crit',@_);syslog('crit','exiting');exit 1;};
open (STDOUT,'>','/dev/null');
open (STDERR,'>','/dev/null');
open (STDIN,'<','/dev/null');
}
check_progfile_status();
warn "server started - ".version_info()."\n";
init_area();
mgmt_init();
clp_init();
trustcheck_init();
sync_cluster_pw() or warn "$CLUSTER_PW_FILE: $!\n";
My::Select::timeout(60,\&purge_old_receiver);
My::Select::timeout(rand(60),\&send_stat);
My::Select::timeout(0,\&sample_rproc) unless Donald::Tools::is_alpha;
$my_hostname eq $STAT_TARGET and My::Cluster::Updown::init();
$my_hostname eq 'macheteinfach' and My::NetlogReceiver::init();
My::Select::timeout(600,\&check_overload);
My::Select::run();
} elsif ($options{'lsof'}) {
lsof($options{'lsof'});
} elsif ($options{'kill'}) {
Donald::Tools::kill_previous_server('clusterd');
} else {
@ARGV or die USAGE;
my ($cmd,@args)=@ARGV;
if ($cmd eq 'push') {
@args>0 or die USAGE;
cmd_push(@args);
} else {
die USAGE;
}
}