diff --git a/clusterd/clusterd b/clusterd/clusterd index afcb828..44fe3fe 100755 --- a/clusterd/clusterd +++ b/clusterd/clusterd @@ -1,81 +1,5 @@ #! /usr/local/system/perl/bin/perl -use warnings; -use strict; - -use POSIX; -use IO::Pipe; -use Digest::MD5; - -#------------------------------------- -package Donald::Tools; - -our $VERSION = '1.00'; - -use Storable; -use Digest::MD5; - -sub hostname { - our $hostname; - unless (defined $hostname) { - $hostname=lc `/bin/hostname`; - chomp($hostname); - $hostname =~ s/\.molgen\.mpg\.de$//; - } - return $hostname; -} - -sub machine { - our $machine; - chomp($machine=`uname -m`) unless defined $machine; - return $machine; -} - -sub uptime { - open U,'<','/proc/uptime' or die "/proc/uptime: $!\n"; - my $data; - sysread(U,$data,1024); - close U; - $data=~ /^(\d+\.?\d*)/ or die "bad data from /proc/uptime: $data\n"; - return $1+0; -} - -sub encode { - return Storable::nfreeze([@_]); -} - -sub sign { - my ($password,$data)=@_; - return Digest::MD5::md5($password.$data).$data; # 16 byte prefix -} - -sub check_sign { # signed-data -> undef or signed-data -> data - my ($password,$data)=@_; - length $data>16 or return undef; - my $rx_digest=substr($data,0,16); - my $signature=Digest::MD5::md5($password.substr($data,16)); - $rx_digest eq $signature or return undef; - return substr($data,16); -} - -sub decode { - my ($data)=@_; - my $msg; - eval { - $msg=Storable::thaw($data); - }; - $@ and return undef; - return @$msg; -} - -#------------------------------------- -package main; - -*encode=*Donald::Tools::encode{CODE}; -*sign=*Donald::Tools::sign{CODE}; -*check_sign=*Donald::Tools::check_sign{CODE}; -*decode=*Donald::Tools::decode{CODE}; - #------------------------------------- package Donald::FileInfo; @@ -200,7 +124,16 @@ sub lstat { #------------------------------------- package My::Select; -our $time=Donald::Tools::uptime(); +sub uptime { + open U,'<','/proc/uptime' or die "/proc/uptime: $!\n"; + my $data; + sysread(U,$data,1024); + close U; + $data=~ /^(\d+\.?\d*)/ or die "bad data from /proc/uptime: $data\n"; + return $1+0; +} + +our $time = uptime(); sub My::Select::time { return $time; @@ -270,7 +203,7 @@ sub cancel_handle { sub run { while (1) { - $time=Donald::Tools::uptime(); + $time = uptime(); while (@TIMER && $TIMER[0]->[0]<=$time) { $active_timer_cb=(shift @TIMER)->[1]; $active_timer_cb->(); @@ -283,6 +216,7 @@ sub run { for (@WRITER) { vec($wvec,$_->[0]->fileno,1)=1 } ; for (@EXCEPT) { vec($evec,$_->[0]->fileno,1)=1 } ; + wait; my $ready=select($rvec,$wvec,$evec,1); if ($ready>0) { for (my $i=0;$i<@READER;$i++) { @@ -321,14 +255,6 @@ use IO::Socket::INET; our $UDP_MAX = 1472; -our $SOL_SOCKET=1; -our $SO_ERROR=4; - -sub get_socket_error { - my ($s)=@_; - return unpack('i',getsockopt($s,$SOL_SOCKET,$SO_ERROR)); -} - sub new { # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) my ($class,@args)=@_; our $socket=new IO::Socket::INET (@args) or return undef; @@ -362,98 +288,6 @@ sub receive_data { My::Select::reader($$self,$receive_data_cb); } -# send_tcp($socket,$data,$timeout,$cb); -# -# send data asynchronously over noblocking tcp socket -# call callback when done ($!=0) or on error ($! set) -# -# all arguments required -# -sub send_tcp { - my ($s,$data,$timeout,$cb)=@_; - my $len=$s->send($data,0); - defined $len or return $cb->(); - if ($len==length($data)) { - $!=0; - $cb->(); - return; - } - my $pos=$len; - my $cb_tmo=sub { - My::Select::cancel_handle($s); - $!=110; - $cb->(); - }; - my $cb_write=sub { - My::Select::timeout_cancel($cb_tmo); - my $len=send($s,substr($data,$pos),0); - defined $len or return $cb->(); - if ($len==length($data)-$pos) { - $!=0; - $cb->(); - return; - } - $pos+=$len; - My::Select::timeout($timeout,$cb_tmo); - My::Select::writer_requeue(); - }; - My::Select::timeout($timeout,$cb_tmo); - My::Select::writer($s,$cb_write); -} - -# $socket = connect_tcp ($ip,$port,$timeout,$cb) -# -# asynchronously connect to tcp socket. -# call callback when done or on error (with $! set) -# -# all arguments required -# -sub connect_tcp { - my ($ip,$port,$timeout,$cb)=@_; - - my $s=new IO::Socket::INET (PeerAddr=>$ip,PeerPort=>$port,Blocking=>0); - defined $s or return $cb->(); - my $cb_tmo=sub { - My::Select::cancel_handle($s); - $!=110; - $cb->(); - }; - my $cb_write=sub { - My::Select::timeout_cancel($cb_tmo); - $!=get_socket_error($s); - $cb->(); - }; - My::Select::timeout($timeout,$cb_tmo); - My::Select::writer($s,$cb_write); - return $s; -} - -# read_with_timeout($socket,$callback,$timeout) -# -# asynchronously read from tcp socket. -# -sub read_with_timeout { - my ($s,$cb,$timeout)=@_; - my $cb_tmo=sub { - My::Select::cancel_handle($s); - $!=110; - $cb->(undef); - }; - my $cb_read=sub { - My::Select::timeout_cancel($cb_tmo); - my $buf=''; - my $l=sysread($s,$buf,102400,0); - if (!defined $l) { - $cb->(undef); - } else { - $!=0; - $cb->($buf); - } - }; - My::Select::timeout($timeout,$cb_tmo); - My::Select::reader($s,$cb_read) -} - #-------------------------------------- package My::Cluster::Updown; @@ -687,16 +521,16 @@ sub init { #------------------------------------------------------------------------ package main; +use warnings; use strict; use IO::File; use Sys::Syslog; use IO::Socket::INET; use Data::Dumper; +use IO::Pipe; +use Digest::MD5; -our $UDP_MAX=1472; our $UDP_PORT=234; -our $BC_RATE = 8; # packets per second broadcast -our $TCP_TIMEOUT=30; # default timeout for tcp processing our (%options); # RUN OPTIONS @@ -732,23 +566,53 @@ our $CLUSTER_PW_TIMESTAMP=0; $ENV{'PATH'} = '/usr/local/bin:/sbin:/usr/sbin:/bin'.($ENV{PATH}?':'.$ENV{PATH}:''); # for ps , tar (gnu!) +sub encode { + return Storable::nfreeze([@_]); +} + +sub sign { + my ($password,$data)=@_; + return Digest::MD5::md5($password.$data).$data; # 16 byte prefix +} + +sub check_sign { # signed-data -> undef or signed-data -> data + my ($password,$data)=@_; + length $data>16 or return undef; + my $rx_digest=substr($data,0,16); + my $signature=Digest::MD5::md5($password.substr($data,16)); + $rx_digest eq $signature or return undef; + return substr($data,16); +} + +sub decode { + my ($data)=@_; + my $msg; + eval { + $msg=Storable::thaw($data); + }; + $@ and return undef; + return @$msg; +} + +our %TRUSTED_IP = ( + '141.14.28.170' => 1, # afk + '141.14.16.131' => 1, # wtf +); + +sub is_trusted_ip { + my ($ip) = @_; + return exists $TRUSTED_IP{$ip} ? 1 : 0; +} + #---------------------------------------------------------- UDP our ($udp_peer_addr,$udp_peer_port); # ('141.14.12.12',1234) our %UDP_HANDLER = ( - 'filedata' => \&udp_rx_filedata, - 'filedata.2' => \&udp_rx_filedata, - 'amdtardata' => \&udp_rx_amdtardata, 'loadavg.2' => \&udp_rx_loadavg2, 'restart' => \&udp_rx_restart, - 'flush-gidcache' => \&udp_rx_flush_gidcache, - 'make-automaps' => \&udp_rx_make_automaps, - 'reexport' => \&udp_rx_reexport, 'log' => \&udp_rx_log, - 'exec' => \&udp_rx_exec, 'exec.2' => \&udp_rx_exec2, - 'push' => \&udp_rx_push, 'push.2' => \&udp_rx_push2, ); @@ -771,86 +635,6 @@ sub udp_send_message { # udp_send_message( dst, @args) # dst='141.14.31.255' 'zo #---------------------------------------------------------- -sub push_amd_tar { - my ($donald_s)=@_; - my $filename='/tmp/amd.tar'; - - my $pid=fork; - defined $pid or return warn "$!\n"; - unless($pid) { - chdir '/etc/amd' or die "/etc/amd: $!\n"; - exec 'tar','cf',$filename,'.'; - die "$!\n"; - } - wait; - $? and return; - - my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; - my $digest=Digest::MD5->new->addfile($fh)->digest; - warn "tar digest is ",Digest::MD5::md5_hex($digest),"\n"; - - $pid=fork; - defined $pid or return warn "$!\n"; - unless($pid) { - exec 'gzip','-f',$filename; - die "$!\n"; - } - wait; - $? and return; - - $filename='/tmp/amd.tar.gz'; - - my $st=Donald::FileInfo->lstat($filename); - defined $st or return warn "$filename: $!\n"; - $st->type eq 'F' or return warn "$filename: not a plain file\n"; - - $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; - - my $i=0; - for (my $pos=0;$pos<$st->size;$pos+=1024) { - my $data; - defined $fh->sysread($data,1024) or return warn "$filename: $!\n"; - # warn "send bytes $pos to ",$pos+length($data),"\n"; - ++$i % $BC_RATE or sleep 1; - udp_broadcast_message($donald_s,'amdtardata',$st,$pos,$data,$digest); - } -} - -sub push_file { - my ($donald_s,$filename)=@_; - - $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; - - my $st=Donald::FileInfo->lstat($filename); - defined $st or return warn "$filename: $!\n"; - my $rpc; - if ($st->type eq 'F') { - $rpc='filedata'; - $st->size<=80000 or die "$filename: to big for broadcast (max 80000 bytes)\n"; - if ($st->size==0) { - udp_broadcast_message($donald_s,$rpc,$st,0,''); - return; - } - - my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; - my $i=0; - for (my $pos=0;$pos<$st->size;$pos+=1024) { - my $data; - defined $fh->sysread($data,1024) or return warn "$filename: $!\n"; - # warn "send bytes $pos to ",$pos+length($data),"\n"; - udp_broadcast_message($donald_s,$rpc,$st,$pos,$data); - ++$i % $BC_RATE or sleep 1; - } - } elsif ($st->type eq 'L') { - $rpc='filedata.2'; - udp_broadcast_message($donald_s,$rpc,$st,0,''); - return; - } else { - die "file type not supported\n"; - } - -} - our %CMD = ( 'mkmotd' => '/usr/sbin/mkmotd.pl', 'flush-gidcache' => 'date -d tomorrow +%s > /proc/net/rpc/auth.unix.gid/flush', @@ -866,34 +650,6 @@ sub send_exec { udp_broadcast_message($donald_s,'exec',$cmd); } -sub udp_rx_exec { - my ($cmd)=@_; - - warn "exec $cmd\n"; - exists $CMD{$cmd} or return; - - my $pid; - $pid=fork; - unless (defined $pid) { - warn "$!\n"; - return; - } - unless ($pid) { - $pid=fork; - defined $pid or exit 1; - $pid and exit; - - open STDIN,'<','/dev/null'; - open STDOUT,'>','/dev/null'; - open STDERR,'>','/dev/null'; - alarm(60); - chdir '/'; - exec '/bin/sh','-c',$CMD{$cmd}; - exit 1; - } - wait; -} - sub udp_rx_exec2 { my @cmd = @_; my $pid = fork; @@ -902,7 +658,6 @@ sub udp_rx_exec2 { return; } if ($pid == 0) { - open STDIN,'<','/dev/null'; alarm(60); chdir '/'; for my $cmd (@cmd) { @@ -915,74 +670,6 @@ sub udp_rx_exec2 { #------------------------------------------------------------- -sub normalize_seg { # [pos,len],[pos,len],... - my @s=sort {$a->[0] <=> $b->[0]} @_; - - my $i=0; - while ($i<$#s) { - # is element $i joinable with next element - - my $end_0=$s[$i]->[0]+$s[$i]->[1]; - if ($end_0 >= $s[$i+1]->[0] ) { - my $end_1=$s[$i+1]->[0]+$s[$i+1]->[1]; - $s[$i]->[1] = ($end_0>$end_1 ? $end_0 : $end_1)-$s[$i]->[0]; - splice @s,$i+1,1; - } else { - $i++; - } - } - return @s; -} - -my %RECEIVER; # ( filename => $receiver, .... ) - -# $receiver : [ st_want , last_rx , io_handle , [ [pos,len] , [pos,len] , ... ] ] - -sub purge_old_receiver { - while (my ($n,$v)=each %RECEIVER) { - if ($v->[1]+10[0]->name,"\n"; - log_to_stat_target('timeout receiving ',$v->[0]->name); - delete $RECEIVER{$n}; - } - } - My::Select::timeout_requeue(60); -} - -#------------------------------------------------------------- - -our $INSTALLED_DIGEST=''; - -our $rx_filedata_done; - -sub udp_rx_amdtardata { - my ($st_want,$pos,$data,$digest)=@_; - - ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo'; - - ### $digest eq $INSTALLED_DIGEST and $pos==0 and warn "/etc/amd - ",Digest::MD5::md5_hex($digest)," already installed\n"; - $digest eq $INSTALLED_DIGEST and return; - - #### $pos==0 and warn "receiving /etc/amd - ",Digest::MD5::md5_hex($digest),"\n"; - - udp_rx_filedata($st_want,$pos,$data); - if ($rx_filedata_done) { - my $pid=fork; - defined $pid or return warn "$!\n"; - unless($pid) { - chdir '/etc/amd' or die "/etc/amd: $!\n"; - exec 'tar','xzf',$st_want->name; - die "$!\n"; - } - } - wait; - $? and return; - - warn "installed /etc/amd - ",Digest::MD5::md5_hex($digest),"\n"; - $INSTALLED_DIGEST=$digest; - system '/sbin/make-automaps'; -} - our ($machine,$SYS_lchown,$SYS_mknod); our ($SYS_utimensat,$AT_FDCWD,$UTIME_OMIT,$AT_SYMLINK_NOFOLLOW); @@ -1020,108 +707,6 @@ sub lmtime { syscall($SYS_utimensat,$AT_FDCWD,$path,$tsa,$AT_SYMLINK_NOFOLLOW)==0 or return warn "$path: failed to lmtime: $!\n"; } -sub udp_rx_filedata { - -# set rx_filedata_done as a side effect - - my ($st_want,$pos,$data)=@_; - - ref($st_want) eq 'My::FileInfo' and bless $st_want,'Donald::FileInfo'; - - my $filename=$st_want->name; - my $tmp_filename="$filename.tmp"; - - $rx_filedata_done=0; - - my $st_is=Donald::FileInfo->lstat($st_want->name); - if ($st_is && $st_is->type eq 'F' && $st_is->size==$st_want->size && $st_is->mtime==$st_want->mtime) { - #### $pos==0 and warn " $filename seems to be current\n"; - return; - } - - if ($st_want->type eq 'L') { - if (!$st_is || $st_is->type ne 'L' || $st_is->target ne $st_want->target) { - $st_is and (unlink($filename) or return warn "$filename: failed to unlink: $!\n"); - symlink($st_want->target,$filename) or return warn "$filename: failed to create symlink: $!\n"; - lchown($st_want->uid,$st_want->gid,$filename); - lmtime($st_want->mtime,$filename); - warn "installed $filename -> ".$st_want->target."\n"; - } else { - if ($st_is->uid != $st_want->uid || $st_is->gid != $st_want->gid) { - lchown($st_want->uid,$st_want->gid,$filename); - } - if ($st_is->mtime != $st_want->mtime) { - lmtime($st_want->mtime,$filename); - } - } - return; -} - - if (length($data) == $st_want->size) { - # complete file in one broadcast - -e $tmp_filename and unlink($tmp_filename); - my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); - defined $fh or return warn "$tmp_filename: $!\n"; - if ($st_want->size) { - $fh->syswrite($data) or return warn "$tmp_filename: $!\n"; - $fh->sync(); - } - chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; - chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; - utime($st_want->mtime,$st_want->mtime,$tmp_filename); - rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; - warn "installed $filename\n"; - $rx_filedata_done=1; - return; - } - - length($data) or return; # shouldn't happen. - - my $receiver=$RECEIVER{$st_want->name}; - - if (defined $receiver) { - if ( $receiver->[0]->size != $st_want->size or $receiver->[0]->mtime != $st_want->mtime ) { - $receiver=undef; - } - } - - unless (defined $receiver) { - # create new receiver - ## warn "start receiving $filename from $udp_peer_addr\n"; - -e $tmp_filename and unlink($tmp_filename); - my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); - defined $fh or return warn "$tmp_filename: $!\n"; - $receiver = [$st_want,My::Select::time,$fh,[]]; - $RECEIVER{$filename}=$receiver; - } - - { - # warn "$filename: receive $pos length ",length($data),"\n"; - - # write data ( size cant be 0 here ) - $receiver->[2]->seek($pos,0) or return warn "$tmp_filename: $!\n"; - $receiver->[2]->print($data) or return warn "$tmp_filename: $!\n";; - $receiver->[1]=My::Select::time; - my $s=$receiver->[3]=[normalize_seg(@{$receiver->[3]},[$pos,length($data)])]; - - #warn "$filename: receive $pos length ",length($data)," segments now: ",join(",",map( {'['.$_->[0].','.$_->[1].']'} @{$receiver->[3]})),"\n"; - - # all there ? - - if (@$s == 1 && $s->[0]->[0]==0 && $s->[0]->[1]==$st_want->size) { - $receiver->[2]->flush(); - $receiver->[2]->sync(); - chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; - chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; - utime($st_want->mtime,$st_want->mtime,$tmp_filename); - rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; - warn "installed $filename\n"; - delete $RECEIVER{$filename}; - $rx_filedata_done=1; - } - } -} - #----------------------------------------------------------- sample running proc every 10 seconds our @proc_running=(0)x12; # 12 samples every 5 seconds -> minute average @@ -1228,27 +813,6 @@ sub udp_rx_restart { exit 40; } -sub udp_rx_flush_gidcache { - if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') { - print $out time(); - } else { - warn "proc/net/rpc/auth.unix.gid/flush: $!\n"; - } -} - -sub udp_rx_make_automaps { - if (open my $out,'>','/proc/net/rpc/auth.unix.gid/flush') { - print $out time(); - } else { - warn "proc/net/rpc/auth.unix.gid/flush: $!\n"; - } - system '/sbin/make-automaps'; -} - -sub udp_rx_reexport { - system '/usr/bin/mxmount --reexport-only'; -} - #----------- tcp mgmt console ----------------------------- our $MGMT_PORT=234; @@ -1368,35 +932,34 @@ sub clp_rx_LSOF { return; } unless ($pid) { - my $pid=fork; - defined $pid or die "$!\n"; - unless ($pid) { - $socket->blocking(1); - # -n inhibits the conversion of network numbers to host names for network files. - # -b causes lsof to avoid kernel functions that might block - lstat(2), readlink(2), and stat(2). - # -w disables warning messages. - open P,'timeout -k 92s 90s lsof -n -b -w|' or die "$!\n"; - while (

) { - next if defined $pattern && index($_,$pattern)<0; - $socket->send(pack('n',length($_)).$_,0); - } - close P; - if ($?) { - $_=sprintf("** lsof timout/error on %s\n",$my_hostname); - $socket->send(pack('n',length($_)).$_,0); - } - close $socket; - exit; + $socket->blocking(1); + # -n inhibits the conversion of network numbers to host names for network files. + # -b causes lsof to avoid kernel functions that might block - lstat(2), readlink(2), and stat(2). + # -w disables warning messages. + open P,'timeout -k 92s 90s lsof -n -b -w|' or die "$!\n"; + while (

) { + next if defined $pattern && index($_,$pattern)<0; + $socket->send(pack('n',length($_)).$_,0); + } + close P; + if ($?) { + $_=sprintf("** lsof timout/error on %s\n",$my_hostname); + $socket->send(pack('n',length($_)).$_,0); } + close $socket; exit; } close $socket; - wait; - return 1; } -sub run_cmd { +sub clp_rx_CMD { my ($socket,@cmd)=@_; + + unless (is_trusted_ip($socket->peerhost())) { + warn "reveived command from untrusted host ". $socket->peerhost(). "\n"; + return; + } + my $pid=fork; unless (defined $pid) { warn"$!\n"; @@ -1411,18 +974,11 @@ sub run_cmd { warn "exec ".join(' ',@cmd)."\n"; $opipe->writer(); $epipe->writer(); - open STDIN,'<','/dev/null'; open STDOUT,'>&',$opipe; open STDERR,'>&',$epipe; exec @cmd; die "$!\n"; } -# $::SIG{'CHLD'}=sub { -# my $pid=wait; -# my $buffer="X$?"; -# $socket->send(pack('n',length($buffer)).$buffer,0); -# exit; -# }; $opipe->reader(); $epipe->reader(); my $ofn=$opipe->fileno; @@ -1467,6 +1023,7 @@ sub run_cmd { } } } + close $socket; } #----------- CLP cluster protocol ----------------------------- @@ -1536,25 +1093,6 @@ sub clp_send_message { # clp_send_message($socket, @args) $s->send(pack('n',length($data)).$data); } -sub clp_rx_CMD { - my ($socket,@args)=@_; - run_cmd($socket,@args); - close $socket; - return 1; -} - -# send_tcp_cp($socket,$cb,$timeout,@args) -# -# send a cluster protocoll message over an async tcp socket. -# -# assume $CLUSTER_PW is valid -# -sub send_tcp_cp { - my ($s,$cb,$timeout,@args)=@_; - my $data=sign($CLUSTER_PW,encode(@args)); - My::Select::INET::send_tcp($s,pack('n',length($data)).$data,$timeout,$cb); -} - sub send_tcp_cp_sync { my ($s, @args) = @_; my $data = sign($CLUSTER_PW, encode(@args)); @@ -1692,6 +1230,8 @@ sub expand_hostconfig_hosts { sub exec_at { my ($host,@cmd)=@_; + is_trusted_ip($my_ip) or die "This command only works on a trusted host\n"; + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; my $s=new IO::Socket::INET(PeerAddr=>$host,PeerPort=>$CLP_PORT); unless (defined $s) { @@ -1841,92 +1381,6 @@ sub clp_rx_PULL { My::Select::writer($s,$cb_write); } -sub udp_rx_push { - my ($ip,$st_want)=@_; - - my $filename=$st_want->name; - my $tmp_filename="$filename.tmp"; - - $ip eq $my_ip and return; - - my $st_is=Donald::FileInfo->lstat($st_want->name); - - unless ($st_want->type eq 'F') { - warn "$filename: type ".$st_want->type." not yet implemented\n"; - return; - } - - if ($st_is - && $st_is->type eq 'F' - && $st_is->size == $st_want->size - && $st_is->mtime == $st_want->mtime - && $st_is->uid == $st_want->uid - && $st_is->gid == $st_want->gid - && $st_is->perm == $st_want->perm - ) { - warn "$filename: already okay\n"; - return; - } - - if ($st_want->size==0) { - -e $tmp_filename and unlink($tmp_filename); - my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); - defined $fh or return warn "$tmp_filename: $!\n"; - # no need to fsync empty file - chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; - chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; - utime($st_want->mtime,$st_want->mtime,$tmp_filename); - rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n";; - warn "installed (empty) $filename\n"; - return; - } - - my $s; - $s=My::Select::INET::connect_tcp($ip,$CLP_PORT,$TCP_TIMEOUT,sub { - $! and return warn "$ip: $!\n"; - send_tcp_cp($s,sub { - $! and return warn "$ip: $!\n"; - -e $tmp_filename and unlink($tmp_filename); - my $fh = IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); - defined $fh or return warn "$tmp_filename: $!\n"; - - my $cb; - my $bytes=$st_want->size; - $cb=sub { - # note, we need to break the circular references $cb of our caller, if no longer needed - my ($buf)=@_; - if ($!) { warn "$ip: $!\n";$cb=undef;return; } - if (length($buf)==0) { warn "$ip: EOF\n";$cb=undef;return;} - $fh->print($buf) or return warn "$tmp_filename: $!\n"; - $bytes-=length($buf); - if ($bytes>0) { - My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); - return; - } - $cb=undef; - $fh->flush(); - $fh->sync(); - chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; - chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; - utime($st_want->mtime,$st_want->mtime,$tmp_filename); - rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; - warn "installed $filename\n"; - }; - My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); - },$TCP_TIMEOUT,'PULL',$st_want); - }); -} - -our %TRUSTED_IP = ( - '141.14.28.170' => 1, # afk - '141.14.16.131' => 1, # wtf -); - -sub is_trusted_ip { - my ($ip) = @_; - return exists $TRUSTED_IP{$ip} ? 1 : 0; -} - sub udp_rx_push2 { my ($ip, $st_ary, $post_ary) = @_; @@ -2020,7 +1474,6 @@ FILE: } } } - open STDIN, '<', '/dev/null'; chdir '/'; alarm(60); for my $cmd (@$post_ary) { @@ -2065,6 +1518,34 @@ sub cmd_exec { udp_broadcast_message($donald_s, 'exec.2', @cmd); } +sub cmd_daemon() { + $SIG{PIPE}='IGNORE'; + open STDIN,'<','/dev/null'; + + $donald_s=new My::Select::INET(Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT) or die "$!\n"; + $donald_s->receive_data(\&udp_message,$donald_s); + + openlog('clusterd','pid','daemon'); + Sys::Syslog::setlogsock('unix'); # with 'native' we get EOLs in the logfile, option "noeol" doesn't work + + check_progfile_status(); + warn "server started - ".version_info()."\n"; + init_area(); + mgmt_init(); + clp_init(); + trustcheck_init(); + + sync_cluster_pw() or warn "$CLUSTER_PW_FILE: $!\n"; + + My::Select::timeout(rand(60),\&send_stat); + My::Select::timeout(0,\&sample_rproc); + $my_hostname eq $STAT_TARGET and My::Cluster::Updown::init(); + $my_hostname eq 'macheteinfach' and My::NetlogReceiver::init(); + My::Select::timeout(600,\&check_overload); + + My::Select::run(); +} + #------------------------------------------------------------ our $TRUSTCHECK_PORT=236; @@ -2094,24 +1575,18 @@ sub trustcheck_connect_request { use constant USAGE => <<'__EOF__'; -usage: $0 [options] - - --push file # broadcast this file - --push-amd-tar # broadcast /etc/amd - --send-restart # broadcast a restart request to all nodes - --exec mkmotd # execute /usr/sbin/mkmotd.pl on all nodes - --exec @node cmd [args...] # execute cmd on node - --flush-gidcache # flush rpc auth.unix.gid cache on all nodes - --make-automaps # execute /usr/sbin/make-automaps on all nodes - --reexport # execute /usr/bin/mxmount --reexport-only on all nodes +usage: $0 cmd [options] args... - --lsof=pattern +Commands: - --daemon # start a daemon - - push [--post CMD] files... # push files over tcp + push [--post CMD] files... # push files over tcp exec CMD... # execute CMD on all nodes - CMD : mkmotd | flush-gidcache | reexport | make-automaps + exec @host cmd... # execute (any) command on host + lsof pattern # list open file matching pattern on all nodes + send-restart # send restart request to all nodes + daemon # run the daemon + + CMD: mkmotd | flush-gidcache | reexport | make-automaps __EOF__ @@ -2131,67 +1606,23 @@ GetOptions ( ) or die USAGE; if (defined $options{'push'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - push_file($donald_s,$options{'push'}); + die ("`clusterd --push` is obsolete. Please use `clusterd push`\n"); } elsif (defined $options{'exec'}) { - if (substr($options{'exec'},0,1) eq '@') { - length(length($options{'exec'})>1) or die USAGE; - @ARGV>=1 or die USAGE; - exec_at(substr($options{'exec'},1),@ARGV); - } else { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - send_exec($donald_s,$options{'exec'}); - } + die ("`clusterd --exec` is obsolete. Please use `clusterd push --post` or `clusterd exec`\n"); } elsif (defined $options{'push_amd_tar'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - push_amd_tar($donald_s); + die ("`clusterd --push-amd-tar` is obsolete. Please use `clusterd push`\n"); } elsif (defined $options{'send-restart'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - udp_broadcast_message($donald_s,'restart'); + die ("`clusterd --send-restart` is obsolete. Please use `clusterd send-restart`\n"); } elsif (defined $options{'flush-gidcache'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - udp_broadcast_message($donald_s,'flush-gidcache'); + die ("`clusterd --flush-gidcache` is obsolete. Please use `clusterd push --post flush-gidcache` or `clusterd exec flush-gidcache`\n"); } elsif (defined $options{'make-automaps'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - udp_broadcast_message($donald_s,'make-automaps'); + die ("`clusterd --make-automaps` is obsolete. Please use `clusterd push --post make-automaps` or `clusterd exec make-automaps`\n"); } elsif (defined $options{'reexport'}) { - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; - udp_broadcast_message($donald_s,'reexport'); + die ("`clusterd --reexport` is obsolete. Please use `clusterd push --post reexport` or `clusterd exec reexport`\n"); } elsif (defined $options{'daemon'}) { - $SIG{PIPE}='IGNORE'; - - $donald_s=new My::Select::INET(Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT) or die "$!\n"; - $donald_s->receive_data(\&udp_message,$donald_s); - - openlog('clusterd','pid','daemon'); - Sys::Syslog::setlogsock('unix'); # with 'native' we get EOLs in the logfile, option "noeol" doesn't work - - check_progfile_status(); - warn "server started - ".version_info()."\n"; - init_area(); - mgmt_init(); - clp_init(); - trustcheck_init(); - - sync_cluster_pw() or warn "$CLUSTER_PW_FILE: $!\n"; - - My::Select::timeout(60,\&purge_old_receiver); - My::Select::timeout(rand(60),\&send_stat); - My::Select::timeout(0,\&sample_rproc); - $my_hostname eq $STAT_TARGET and My::Cluster::Updown::init(); - $my_hostname eq 'macheteinfach' and My::NetlogReceiver::init(); - My::Select::timeout(600,\&check_overload); - - My::Select::run(); + die ("`clusterd --daemon` is obsolete. Please use `clusterd daemon`\n"); } elsif ($options{'lsof'}) { - lsof($options{'lsof'}); + die ("`clusterd --lsof` is obsolete. Please use `clusterd lsof`\n"); } else { @ARGV or die USAGE; my ($cmd,@args)=@ARGV; @@ -2200,7 +1631,24 @@ if (defined $options{'push'}) { cmd_push($options{'post'} || [], @args); } elsif ($cmd eq 'exec') { @args > 0 or die USAGE; - cmd_exec(@args); + if (substr($args[0], 0, 1) eq '@') { + my $host = substr(shift @args, 1); + @args > 0 or die USAGE; + exec_at($host ,@args); + } else { + cmd_exec(@args); + } + } elsif ($cmd eq 'lsof') { + @args == 1 or die USAGE; + lsof(@args); + } elsif ($cmd eq 'send-restart') { + @args == 0 or die USAGE; + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s,'restart'); + } elsif ($cmd eq 'daemon') { + @args == 0 or die USAGE; + cmd_daemon(); } else { die USAGE; } diff --git a/clusterd/clusterd.service b/clusterd/clusterd.service index 99125df..d3397bc 100644 --- a/clusterd/clusterd.service +++ b/clusterd/clusterd.service @@ -1,5 +1,5 @@ [Service] -ExecStart=/usr/sbin/clusterd --daemon +ExecStart=/usr/sbin/clusterd daemon Restart=always RestartSec=10s