From d7e9bc5357eb98a4279b61e6a7a43b0fb5c83116 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Tue, 21 Jun 2016 09:16:02 +0200 Subject: [PATCH] clusterd: style: remove some extra whitespace --- clusterd/clusterd | 153 +++++++++++++--------------------------------- 1 file changed, 41 insertions(+), 112 deletions(-) diff --git a/clusterd/clusterd b/clusterd/clusterd index f97ce68..386daa9 100755 --- a/clusterd/clusterd +++ b/clusterd/clusterd @@ -16,7 +16,6 @@ use Donald::Select::INET; use POSIX; use IO::Pipe; - #-------------------------------------- package My::Cluster::Updown; @@ -24,7 +23,6 @@ package My::Cluster::Updown; # monitor nodes # - our %H; # ( name=> [state , last_seen , expect_seq , @data ] , ... ) our $MONITOR_STARTING=1; @@ -37,7 +35,6 @@ our %DUPLICATES; # 3: processor capacity # 4: string unix version - use Storable; sub save_state { @@ -45,25 +42,23 @@ sub save_state { } sub restore_state { - -e '/root/clusterd.monitor.state' and %H=%{retrieve '/root/clusterd.monitor.state'}; for (values %H) { $_->[1]=0; } } - sub status { my ($f)=@_; my (@up,@down); for (sort keys %H) { push @up, $_ if $H{$_}->[0] eq 'UP' } for (sort keys %H) { push @down,$_ if $H{$_}->[0] eq 'DOWN' } - + $f->printf("UP (%d): %s\n\n",scalar(@up),join(' ',@up)); $f->printf("DOWN (%d): %s\n\n",scalar(@down),join(' ',@down)); my (@k,$max); - + @k=sort({$H{$b}->[3] <=> $H{$a}->[3]} grep({$H{$_}->[0] eq 'UP'} (keys %H))); $max=0; $f->print("TOP 10: "); @@ -127,7 +122,6 @@ sub timeout_hosts { save_state(); } - sub rx_hostannounce { # ($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev) @@ -138,7 +132,7 @@ sub rx_hostannounce { unless (exists $H{$host}) { if ($seq==0) { msg_state('NEW','UP',$host,"discovered new node $version - $unixrev"); - } else { + } else { $MONITOR_STARTING or msg_state('NEW','UP',$host,"discovered running node $version - $unixrev"); } $H{$host}=['UP',Donald::Select::time,$seq+1,@more]; @@ -179,7 +173,6 @@ sub delete_host { msg_text("host $host removed from monitor"); } - #----------------------------------------------------------------------- package My::NetlogReceiver; @@ -194,7 +187,6 @@ sub day { return sprintf "%04d%02d%02d",$f[5]+1900,$f[4]+1,$f[3]; } - sub bigben { my $day=day(); $day le $DAY_LAST_MSG and return; @@ -207,14 +199,12 @@ sub bigben_timer { Donald::Select::timeout_requeue(30); } - sub bigben_init { $DAY_LAST_MSG=day(); Donald::Select::timeout(30,\&bigben_timer); } - sub receive { my ($socket,$peernode,$bufref)=@_; my $data; @@ -226,7 +216,7 @@ sub receive { length $data or return; $$bufref.=$data; - + while (1) { last if length($$bufref)<2; my $l=unpack('n',$$bufref); @@ -240,7 +230,6 @@ sub receive { Donald::Select::reader_requeue(); } - sub connect_request { Donald::Select::reader_requeue(); @@ -253,15 +242,12 @@ sub connect_request { # warn "$peernode: connect\n"; } - sub init { $listen_socket=new IO::Socket::INET(Proto=>'tcp',LocalPort=>1028,Listen=>1,ReuseAddr=>1) or die "$!\n"; Donald::Select::reader($listen_socket,\&connect_request); bigben_init(); } - - #------------------------------------------------------------------------ package main; use strict; @@ -284,12 +270,11 @@ $my_hostname=lc `/bin/hostname`; chomp($my_hostname); $my_hostname =~ s/\.molgen\.mpg\.de$//; - while (1) { my $addr=inet_aton($my_hostname); if(defined $addr) { $my_ip=inet_ntoa(inet_aton($my_hostname)); - } + } last if defined $my_ip; my $once; unless ($once) { @@ -299,7 +284,6 @@ while (1) { sleep 30; } - our $my_unixrev; $my_unixrev=`uname -r`; chomp($my_unixrev); @@ -309,11 +293,9 @@ our $CLUSTER_PW_FILE='/etc/clusterd.password'; our $OLD_CLUSTER_PW_FILE='/root/clusterd.password'; our $CLUSTER_PW_TIMESTAMP=0; - $ENV{'PATH'} = '/usr/local/bin:/sbin:/usr/sbin:/bin'.($ENV{PATH}?':'.$ENV{PATH}:''); # for amq , ps , tar (gnu!) - -#---------------------------------------------------------- UDP +#---------------------------------------------------------- UDP our ($udp_peer_addr,$udp_peer_port); # ('141.14.12.12',1234) @@ -328,33 +310,29 @@ our %UDP_HANDLER= 'exec' => \&udp_rx_exec, ); - sub udp_message { my ($data,$x_udp_peer_addr,$x_udp_peer_port,$donald_s)=@_; - + ($udp_peer_addr,$udp_peer_port)=($x_udp_peer_addr,$x_udp_peer_port); - + defined $CLUSTER_PW or return; my ($handler_name,@args)=decode(check_sign($CLUSTER_PW,$data)) or return; $UDP_HANDLER{$handler_name}->(@args) if exists $UDP_HANDLER{$handler_name}; } - sub udp_send_message { # udp_send_message( dst, @args) # dst='141.14.31.255' 'zork' '141.14.16.1' etc. data is anything my ($ip,@args)=@_; defined $CLUSTER_PW or return; $donald_s->send_data($ip,$UDP_PORT,sign($CLUSTER_PW,encode(@args))); } - - #---------------------------------------------------------- sub push_amd_tar { my ($donald_s)=@_; my $filename='/tmp/amd.tar'; - + my $pid=fork; defined $pid or return warn "$!\n"; unless($pid) { @@ -365,8 +343,7 @@ sub push_amd_tar { wait; $? and return; - - my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; + my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; my $digest=Digest::MD5->new->addfile($fh)->digest; warn "tar digest is ",Digest::MD5::md5_hex($digest),"\n"; @@ -380,7 +357,7 @@ sub push_amd_tar { $? and return; $filename='/tmp/amd.tar.gz'; - + my $st=Donald::FileInfo->lstat($filename); defined $st or return warn "$filename: $!\n"; $st->type eq 'F' or return warn "$filename: not a plain file\n"; @@ -397,35 +374,31 @@ sub push_amd_tar { } } - - sub push_file { my ($donald_s,$filename)=@_; - + $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; - + my $st=Donald::FileInfo->lstat($filename); defined $st or return warn "$filename: $!\n"; $st->type eq 'F' or return warn "$filename: not a plain file\n"; $st->size<=40960 or return warn "$filename: to big for broadcast (max 40960 bytes)\n"; - my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; + my $fh=new IO::File $filename,'<' or return warn "$filename: $!\n"; my $i=0; for (my $pos=0;$pos<$st->size;$pos+=1024) { my $data; defined $fh->sysread($data,1024) or return warn "$filename: $!\n"; # warn "send bytes $pos to ",$pos+length($data),"\n"; udp_broadcast_message($donald_s,'filedata',$st,$pos,$data); - ++$i % $BC_RATE or sleep 1; + ++$i % $BC_RATE or sleep 1; } } - our %CMD=( 'mkmotd'=>'/usr/sbin/mkmotd.pl', ); - sub send_exec { my ($donald_s,$cmd)=@_; unless (exists $CMD{$cmd}) { @@ -434,7 +407,6 @@ sub send_exec { udp_broadcast_message($donald_s,'exec',$cmd); } - sub udp_rx_exec { my ($cmd)=@_; @@ -451,7 +423,7 @@ sub udp_rx_exec { $pid=fork; defined $pid or exit 1; $pid and exit; - + open STDIN,'<','/dev/null'; open STDOUT,'>','/dev/null'; open STDERR,'>','/dev/null'; @@ -463,15 +435,11 @@ sub udp_rx_exec { wait; } - - #------------------------------------------------------------- - - sub normalize_seg { # [pos,len],[pos,len],... my @s=sort {$a->[0] <=> $b->[0]} @_; - + my $i=0; while ($i<$#s) { # is element $i joinable with next element @@ -488,13 +456,10 @@ sub normalize_seg { # [pos,len],[pos,len],... return @s; } - -my %RECEIVER; # ( filename => $receiver, .... ) - +my %RECEIVER; # ( filename => $receiver, .... ) # $receiver : [ st_want , last_rx , io_handle , [ [pos,len] , [pos,len] , ... ] ] - sub purge_old_receiver { while (my ($n,$v)=each %RECEIVER) { if ($v->[1]+10lstat($st_want->name); if ($st_is && $st_is->type eq 'F' && $st_is->size==$st_want->size && $st_is->mtime==$st_want->mtime) { #### $pos==0 and warn " $filename seems to be current\n"; return; - } + } if (length($data) == $st_want->size) { # complete file in one broadcast @@ -575,10 +540,9 @@ sub udp_rx_filedata { $rx_filedata_done=1; return; } - - length($data) or return; # shouldn't happen. - - + + length($data) or return; # shouldn't happen. + my $receiver=$RECEIVER{$st_want->name}; if (defined $receiver) { @@ -599,7 +563,7 @@ sub udp_rx_filedata { { # warn "$filename: receive $pos length ",length($data),"\n"; - + # write data ( size cant be 0 here ) $receiver->[2]->seek($pos,0) or return warn "$tmp_filename: $!\n"; $receiver->[2]->syswrite($data) or return warn "$tmp_filename: $!\n";; @@ -609,7 +573,7 @@ sub udp_rx_filedata { #warn "$filename: receive $pos length ",length($data)," segments now: ",join(",",map( {'['.$_->[0].','.$_->[1].']'} @{$receiver->[3]})),"\n"; # all there ? - + if (@$s == 1 && $s->[0]->[0]==0 && $s->[0]->[1]==$st_want->size) { $receiver->[2]->close; chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; @@ -624,8 +588,6 @@ sub udp_rx_filedata { } } - - #----------------------------------------------------------- sample running proc every 10 seconds our @proc_running=(0)x12; # 12 samples every 5 seconds -> minute average @@ -662,7 +624,6 @@ sub running_proc_10 { return $ret/@proc_running_10; } - sub sample_rproc { # every 5 seconds @proc_running=(@proc_running[1..@proc_running-1],get_cur_running_proc()-1); if ($SAMPLE_TICK<12) { @@ -674,14 +635,8 @@ sub sample_rproc { # every 5 seconds Donald::Select::timeout_requeue(5); } - - - #----------------------------------------------------------- stat - -#----------------------------------------------------------- - our ($CPUS,$BOGOMIPS); sub init_cpuinfo { @@ -729,9 +684,6 @@ sub udp_rx_loadavg2 { $my_hostname eq $STAT_TARGET and My::Cluster::Updown::rx_hostannounce($hostname,$seq,$load_avg,$opt_version,$opt_pload,$opt_pcapacity,$opt_unixrev) } - - - sub udp_rx_log { my ($msg)=@_; My::Cluster::Updown::msg_text($msg); @@ -742,7 +694,6 @@ sub log_to_stat_target { udp_send_message($STAT_TARGET,'log',"$my_hostname: $msg"); } - # ---------------------------------------------------------- sub udp_rx_restart { @@ -787,7 +738,7 @@ sub mgmt_connect_request { my $socket=$mgmt_listen_socket->accept(); $socket->blocking(0); my $peernode=$socket->peerhost; - + ### warn "accepted mgmt connection from $peernode\n"; Donald::Select::reader($socket,\&mgmt_receive,$socket); @@ -814,7 +765,7 @@ sub mgmt_receive { $my_hostname eq $STAT_TARGET and My::Cluster::Updown::status($s); $s->print("AREA: ",area_config_as_string(),"\n"); $s->print("STAT TARGET: ",$STAT_TARGET,"\n"); - $s->print(' (',scalar(localtime),')',"\n"); + $s->print(' (',scalar(localtime),')',"\n"); } elsif ($data eq 'r') { for my $host (sort keys %H) { @@ -855,7 +806,7 @@ r : dump unix revisions delete HOST : forget about HOST dup show : show duplicates stat -dup clear : clear duplicates stat +dup clear : clear duplicates stat to exit use ^D @@ -863,12 +814,11 @@ _EOF_ } } - sub mgmt_print_all { my ($msg)=@_; $options{'foreground'} and not $options{'syslog'} and print $msg; - + for my $s (values(%mgmt_sockets)) { $s->print($msg); } @@ -876,9 +826,6 @@ sub mgmt_print_all { #----------------------------------------------------------- - - - sub clp_rx_LSOF { my ($socket,$pattern)=@_; @@ -886,8 +833,8 @@ sub clp_rx_LSOF { unless (defined $pid) { warn"$!\n"; return; - } - unless ($pid) { + } + unless ($pid) { my $pid=fork; defined $pid or die "$!\n"; unless ($pid) { @@ -914,7 +861,7 @@ sub run_cmd { unless (defined $pid) { warn"$!\n"; return; - } + } unless ($pid) { my $opipe=new IO::Pipe; my $epipe=new IO::Pipe; @@ -934,7 +881,7 @@ sub run_cmd { # my $pid=wait; # my $buffer="X$?"; # $socket->send(pack('n',length($buffer)).$buffer,0); -# exit; +# exit; # }; $opipe->reader(); $epipe->reader(); @@ -979,12 +926,9 @@ sub run_cmd { exit; } } - - } + } } - - #----------- CLP cluster protocol ----------------------------- our $CLP_PORT=235; @@ -1009,7 +953,7 @@ sub clp_connect_request { my $socket=$clp_listen_socket->accept(); $socket->blocking(0); my $peernode=$socket->peerhost; - + my $buffer=''; Donald::Select::reader($socket,\&clp_receive,$socket,\$buffer); @@ -1046,7 +990,6 @@ sub clp_message { $CLP_HANDLER{$handler_name}->($socket,@args) if exists $CLP_HANDLER{$handler_name}; } - sub clp_send_message { # clp_send_message($socket, @args) my ($s,@args)=@_; defined $CLUSTER_PW or return; @@ -1057,7 +1000,6 @@ sub clp_send_message { # clp_send_message($socket, @args) $s->send(pack('n',length($data)).$data); } - sub clp_rx_CMD { my ($socket,@args)=@_; run_cmd($socket,@args); @@ -1065,7 +1007,6 @@ sub clp_rx_CMD { return 1; } - #---------------------------------------------------------- #our $CLUSTER_PW; @@ -1073,7 +1014,6 @@ sub clp_rx_CMD { #our $OLD_CLUSTER_PW_FILE='/root/clusterd.password'; #our $CLUSTER_PW_TIMESTAMP=0; - sub sync_cluster_pw { my $st=Donald::FileInfo->lstat($CLUSTER_PW_FILE); @@ -1115,22 +1055,18 @@ sub sync_cluster_pw { return defined $CLUSTER_PW; } - #------------------------------------------------------------ # area routing - our %AREA_ROUTER= ( lol => '141.14.31.255', # orkrist=> '10.14.0.255', ); - our $area_socket; - sub init_area { exists $AREA_ROUTER{$my_hostname} or return; warn "I am area router for $AREA_ROUTER{$my_hostname}\n"; @@ -1150,7 +1086,6 @@ sub area_message { Donald::Select::reader_requeue(); } - sub udp_broadcast_message { my ($donald_s,@args)=@_; @@ -1268,7 +1203,7 @@ sub lsof { unless (defined $s) { warn "$host: $!\n"; next; - } + } clp_send_message($s,'LSOF',$pattern); my $pbuffer=''; my $olbuffer=''; @@ -1278,7 +1213,6 @@ sub lsof { Donald::Select::run() if $slave;; } - sub lsof_rx { my ($host,$s,$pbufref,$olbufref)=@_; my $data; @@ -1314,7 +1248,6 @@ sub lsof_msg { ($$olbufref)=$msg=~/([^\n]*)\z/; } - sub cmd_rx { my ($host,$s,$pbufref,$olbufref,$elbufref)=@_; my $data; @@ -1365,10 +1298,8 @@ sub cmd_msg { } } - #------------------------------------------------------------ - use constant USAGE => <<'__EOF__'; usage: $0 [options] @@ -1378,11 +1309,11 @@ usage: $0 [options] --send-restart # broadcast a restart request to all nodes --exec mkmotd # execute /usr/sbin/mkmotd.pl on all nodes --flush-gidcache # flush rpc auth.unix.gid cache on all nodes - + --lsof=pattern --kill # try to kill a running server - + --daemon # start a daemon --kill # try to kill previous server first --foreground # stay in foreground, log to stderr @@ -1390,7 +1321,6 @@ usage: $0 [options] __EOF__ - use Getopt::Long; GetOptions ( @@ -1407,7 +1337,6 @@ GetOptions ) or die USAGE; - if (defined $options{'push'}) { sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; $donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n"; @@ -1459,7 +1388,7 @@ if (defined $options{'push'}) { clp_init(); sync_cluster_pw() or warn "$CLUSTER_PW_FILE: $!\n"; - + Donald::Select::timeout(60,\&purge_old_receiver); Donald::Select::timeout(rand(60),\&send_stat); Donald::Select::timeout(0,\&sample_rproc) unless Donald::Tools::is_alpha; @@ -1468,7 +1397,7 @@ if (defined $options{'push'}) { Donald::Select::timeout(600,\&check_overload); Donald::Select::timeout(30,\&ping_mlx); - Donald::Select::run(); + Donald::Select::run(); } elsif ($options{'lsof'}) { lsof($options{'lsof'}); } elsif ($options{'kill'}) {