diff --git a/clusterd b/clusterd index 624ec61..97867d9 100755 --- a/clusterd +++ b/clusterd @@ -10,11 +10,283 @@ our $REVISION='1.108'; #use lib ('/home/buczek/cluster/Donald/blib/lib'); use Donald::Tools qw(encode sign check_sign decode); -use Donald::Select; use Donald::FileInfo; -use Donald::Select::INET; use POSIX; use IO::Pipe; +use Digest::MD5; + +#------------------------------------- +package My::Select; + +our $time=Donald::Tools::uptime(); + +sub My::Select::time +{ + return $time; +} + +our @TIMER=(); # ( [duetime,cb] , ... ) sorted by time +our $active_timer_cb; + +sub timeout # cb=Select::timeout(seconds,cb) +{ + my ($delta,$cb)=@_; + my $duetime=$time+$delta; + @TIMER=sort {$a->[0] <=> $b->[0]} ( [$duetime,$cb] , @TIMER ); + return $cb; +} + +sub timeout_cancel # cb=Select::timeout_cancel(cb) +{ + my ($cb)=@_; + @TIMER=grep {$_->[1] != $cb} @TIMER; + return $cb; +} + +sub timeout_requeue {timeout(shift,$active_timer_cb); } # Select::timeout_requeue(seconds) + +#------------------------------------ + +our @READER; # ( [Handle,cb] , ... ) +our @WRITER; +our @EXCEPT; +our $active_io; # [Handle,cb] + +sub reader # cb = Select::reader(Handle,cb) +{ + my ($handle,$cb)=@_; + push @READER,[$handle,$cb]; + return $cb; +} + +sub writer # cb = Select::writer(Handle,cb) +{ + my ($handle,$cb)=@_; + push @WRITER,[$handle,$cb]; + return $cb; +} + +sub except # cb = Select::except(Handle,cb) +{ + my ($handle,$cb)=@_; + push @EXCEPT,[$handle,$cb]; + return $cb; +} + +sub reader_requeue {push @READER,$active_io} +sub writer_requeue {push @WRITER,$active_io} +sub except_requeue {push @EXCEPT,$active_io} + +sub cancel # $cb = Select::cancel(cb) +{ + my ($cb)=@_; + defined $cb or $cb=$active_io->[1]; + @READER=grep {$_->[1] != $cb} @READER; + @WRITER=grep {$_->[1] != $cb} @WRITER; + @EXCEPT=grep {$_->[1] != $cb} @EXCEPT; +} + +sub cancel_handle +{ + my ($handle)=@_; + @READER=grep {$_->[0] != $handle} @READER; + @WRITER=grep {$_->[0] != $handle} @WRITER; + @EXCEPT=grep {$_->[0] != $handle} @EXCEPT; +} + +sub run +{ + while (1) { + $time=Donald::Tools::uptime(); + while (@TIMER && $TIMER[0]->[0]<=$time) { + $active_timer_cb=(shift @TIMER)->[1]; + $active_timer_cb->(); + } + $active_timer_cb=undef; + + my ($rvec,$wvec,$evec)=('','',''); + + for (@READER) { vec($rvec,$_->[0]->fileno,1)=1 } ; + for (@WRITER) { vec($wvec,$_->[0]->fileno,1)=1 } ; + for (@EXCEPT) { vec($evec,$_->[0]->fileno,1)=1 } ; + + my $ready=select($rvec,$wvec,$evec,1); + if ($ready>0) { + for (my $i=0;$i<@READER;$i++) { + if (vec($rvec,$READER[$i]->[0]->fileno,1)) { + $active_io=splice @READER,$i,1; + $active_io->[1]->(); + $active_io=undef; + last; + } + } + for (my $i=0;$i<@WRITER;$i++) { + if (vec($wvec,$WRITER[$i]->[0]->fileno,1)) { + $active_io=splice @WRITER,$i,1; + $active_io->[1]->(); + $active_io=undef; + last; + } + } + for (my $i=0;$i<@EXCEPT;$i++) { + if (vec($evec,$EXCEPT[$i]->[0]->fileno,1)) { + $active_io=splice @EXCEPT,$i,1; + $active_io->[1]->(); + $active_io=undef; + last; + } + } + } + } +} + +#-------------------------------------- +package My::Select::INET ; + +use Carp; +use IO::Socket::INET; + +our $UDP_MAX=1472; # for broadcast on alphas + +our $SOL_SOCKET=1; +our $SO_ERROR=4; + +sub get_socket_error +{ + my ($s)=@_; + return unpack('i',getsockopt($s,$SOL_SOCKET,$SO_ERROR)); +} + +sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) +{ + my ($class,@args)=@_; + our $socket=new IO::Socket::INET (@args) or return undef; + return bless \$socket,$class; +} + + +sub send_data +{ + my ($self,$ip,$port,$data)=@_; + my $ip_address=inet_aton($ip); + unless (defined $ip_address) {carp("can't resolve $ip\n");return undef} + unless (length($data)<=$UDP_MAX) {carp("message to long\n");return undef} + $$self->send($data,0,pack_sockaddr_in($port,$ip_address)) or carp "$!\n"; +} + + +sub reader +{ + my ($self,$sub)=@_; + My::Select::reader($$self,$sub); +} + +sub receive_data +{ + my ($self,$cb,@args)=@_; + + my $receive_data_cb=sub { + my $data; + my $peer = $$self->recv($data,$UDP_MAX); + my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); + my $udp_peer_addr=inet_ntoa($peer_iaddr); + My::Select::reader_requeue(); + $cb->($data,$udp_peer_addr,$udp_peer_port); + }; + My::Select::reader($$self,$receive_data_cb); +} + +# send_tcp($socket,$data,$timeout,$cb); +# +# send data asynchronously over noblocking tcp socket +# call callback when done ($!=0) or on error ($! set) +# +# all arguments required +# +sub send_tcp { + my ($s,$data,$timeout,$cb)=@_; + my $len=$s->send($data,0); + defined $len or return $cb->(); + if ($len==length($data)) { + $!=0; + $cb->(); + return; + } + my $pos=$len; + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(); + }; + my $cb_write=sub { + My::Select::timeout_cancel($cb_tmo); + my $len=send($s,substr($data,$pos),0); + defined $len or return $cb->(); + if ($len==length($data)-$pos) { + $!=0; + $cb->(); + return; + } + $pos+=$len; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer_requeue(); + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer($s,$cb_write); +} + +# $socket = connect_tcp ($ip,$port,$timeout,$cb) +# +# asynchronously connect to tcp socket. +# call callback when done or on error (with $! set) +# +# all arguments required +# +sub connect_tcp { + my ($ip,$port,$timeout,$cb)=@_; + + my $s=new IO::Socket::INET (PeerAddr=>$ip,PeerPort=>$port,Blocking=>0); + defined $s or return $cb->(); + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(); + }; + my $cb_write=sub { + My::Select::timeout_cancel($cb_tmo); + $!=get_socket_error($s); + $cb->(); + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer($s,$cb_write); + return $s; +} + +# read_with_timeout($socket,$callback,$timeout) +# +# asynchronously read from tcp socket. +# +sub read_with_timeout { + my ($s,$cb,$timeout)=@_; + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(undef); + }; + my $cb_read=sub { + My::Select::timeout_cancel($cb_tmo); + my $buf=''; + my $l=sysread($s,$buf,102400,0); + if (!defined $l) { + $cb->(undef); + } else { + $!=0; + $cb->($buf); + } + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::reader($s,$cb_read) +} #-------------------------------------- package My::Cluster::Updown; @@ -104,13 +376,13 @@ sub msg_state { sub init { restore_state; msg_text('node monitor: started. (recovery mode)'); - Donald::Select::timeout(630,sub{$MONITOR_STARTING=0;msg_text('node monitor: recovery finished');}); - Donald::Select::timeout(630,\&timeout_hosts); + My::Select::timeout(630,sub{$MONITOR_STARTING=0;msg_text('node monitor: recovery finished');}); + My::Select::timeout(630,\&timeout_hosts); } sub timeout_hosts { - Donald::Select::timeout_requeue(60); - my $timeout=Donald::Select::time()-1230; # 2x10 minutes + 30 seconds + My::Select::timeout_requeue(60); + my $timeout=My::Select::time()-1230; # 2x10 minutes + 30 seconds for (keys %H) { my $h=$H{$_}; @@ -135,7 +407,7 @@ sub rx_hostannounce { } else { $MONITOR_STARTING or msg_state('NEW','UP',$host,"discovered running node $version - $unixrev"); } - $H{$host}=['UP',Donald::Select::time,$seq+1,@more]; + $H{$host}=['UP',My::Select::time,$seq+1,@more]; } else { my $h=$H{$host}; if ($h->[0] eq 'UP') { @@ -163,7 +435,7 @@ sub rx_hostannounce { msg_state('DOWN','UP',$host,$seq-$h->[2]." packet(s) lost"); } } - @$h=('UP',Donald::Select::time,$seq+1,@more); + @$h=('UP',My::Select::time,$seq+1,@more); } } @@ -196,12 +468,12 @@ sub bigben { sub bigben_timer { bigben(); - Donald::Select::timeout_requeue(30); + My::Select::timeout_requeue(30); } sub bigben_init { $DAY_LAST_MSG=day(); - Donald::Select::timeout(30,\&bigben_timer); + My::Select::timeout(30,\&bigben_timer); } @@ -227,24 +499,24 @@ sub receive { $|=1; warn "NETLOG $msg\n" unless $msg=~/NETLOG/; } - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); } sub connect_request { - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); my $socket=$listen_socket->accept(); $socket->blocking(0); my $peernode=$socket->peerhost; my $buffer=''; - Donald::Select::reader($socket,\&receive,$socket,$peernode,\$buffer); + My::Select::reader($socket,sub{receive($socket,$peernode,\$buffer)}); # warn "$peernode: connect\n"; } sub init { $listen_socket=new IO::Socket::INET(Proto=>'tcp',LocalPort=>1028,Listen=>1,ReuseAddr=>1) or die "$!\n"; - Donald::Select::reader($listen_socket,\&connect_request); + My::Select::reader($listen_socket,\&connect_request); bigben_init(); } @@ -259,10 +531,11 @@ use Data::Dumper; our $UDP_MAX=1472; # for broadcast on alphas our $UDP_PORT=234; our $BC_RATE=8; # packets per second broadcast +our $TCP_TIMEOUT=30; # default timeout for tcp processing our (%options); # RUN OPTIONS -our $donald_s; # Donald::Select::INET udp socket +our $donald_s; # My::Select::INET udp socket our $my_hostname; our $my_ip; # '141.14.12.12' @@ -310,6 +583,7 @@ our %UDP_HANDLER= 'make-automaps' => \&udp_rx_make_automaps, 'log' => \&udp_rx_log, 'exec' => \&udp_rx_exec, + 'push' => \&udp_rx_push, ); sub udp_message { @@ -478,13 +752,13 @@ my %RECEIVER; # ( filename => $receiver, .... ) sub purge_old_receiver { while (my ($n,$v)=each %RECEIVER) { - if ($v->[1]+10[1]+10[0]->name,"\n"; log_to_stat_target('timeout receiving ',$v->[0]->name); delete $RECEIVER{$n}; } } - Donald::Select::timeout_requeue(60); + My::Select::timeout_requeue(60); } #------------------------------------------------------------- @@ -591,7 +865,7 @@ sub udp_rx_filedata { if ($st_want->type eq 'L') { if (!$st_is || $st_is->type ne 'L' || $st_is->target ne $st_want->target) { - $st_is and (unlink($filename) or return warn "$filename: failed to unlink: $!\n"); + $st_is and (unlink($filename) or return warn "$filename: failed to unlink: $!\n"); symlink($st_want->target,$filename) or return warn "$filename: failed to create symlink: $!\n"; lchown($st_want->uid,$st_want->gid,$filename); lmtime($st_want->mtime,$filename); @@ -639,7 +913,7 @@ sub udp_rx_filedata { -e $tmp_filename and unlink($tmp_filename); my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0); defined $fh or return warn "$tmp_filename: $!\n"; - $receiver = [$st_want,Donald::Select::time,$fh,[]]; + $receiver = [$st_want,My::Select::time,$fh,[]]; $RECEIVER{$filename}=$receiver; } @@ -649,7 +923,7 @@ sub udp_rx_filedata { # write data ( size cant be 0 here ) $receiver->[2]->seek($pos,0) or return warn "$tmp_filename: $!\n"; $receiver->[2]->syswrite($data) or return warn "$tmp_filename: $!\n";; - $receiver->[1]=Donald::Select::time; + $receiver->[1]=My::Select::time; my $s=$receiver->[3]=[normalize_seg(@{$receiver->[3]},[$pos,length($data)])]; #warn "$filename: receive $pos length ",length($data)," segments now: ",join(",",map( {'['.$_->[0].','.$_->[1].']'} @{$receiver->[3]})),"\n"; @@ -714,7 +988,7 @@ sub sample_rproc { # every 5 seconds @proc_running_10=(@proc_running_10[1..@proc_running_10-1],running_proc()); $SAMPLE_TICK=0; } - Donald::Select::timeout_requeue(5); + My::Select::timeout_requeue(5); } #----------------------------------------------------------- stat @@ -755,7 +1029,7 @@ sub loadavg { # AXP : (system load average) , LINUX: (system load average, pl our $STAT_TARGET='lol'; our $STAT_SEQ=0; sub send_stat { - Donald::Select::timeout_requeue(600); + My::Select::timeout_requeue(600); my ($load_avg,$pload,$pcapacity)=loadavg(); defined $load_avg or return; udp_send_message($STAT_TARGET,'loadavg.2',$my_hostname,$STAT_SEQ++,$load_avg,version_info(),$pload,$pcapacity,$my_unixrev); @@ -817,12 +1091,12 @@ our %mgmt_sockets; sub mgmt_init { $mgmt_listen_socket=new IO::Socket::INET(LocalPort=>$MGMT_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1); defined $mgmt_listen_socket or die "$!\n"; - Donald::Select::reader($mgmt_listen_socket,\&mgmt_connect_request); + My::Select::reader($mgmt_listen_socket,\&mgmt_connect_request); } sub mgmt_connect_request { - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); # listen socket ready @@ -832,7 +1106,7 @@ sub mgmt_connect_request { ### warn "accepted mgmt connection from $peernode\n"; - Donald::Select::reader($socket,\&mgmt_receive,$socket); + My::Select::reader($socket,sub{mgmt_receive($socket)}); $mgmt_sockets{$socket}=$socket; $socket->print("clusterd ".version_info()." stupid console\n"); $socket->print("For historical messages, grep \"clusterd\" from /var/log/messages on $STAT_TARGET (or \"tail -f /var/log/messages |grep cluster\")\n"); @@ -849,7 +1123,7 @@ sub mgmt_receive { return; } - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); $data=~s/\r?\n$//; length $data or return; if ($data eq 'l') { @@ -1025,19 +1299,17 @@ sub run_cmd { our $CLP_PORT=235; our $clp_listen_socket; -our %clp_sockets; - -our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF); +our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF,'PULL'=>\&clp_rx_PULL); sub clp_init { - $clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1); + $clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>128,ReuseAddr=>1); defined $clp_listen_socket or die "$!\n"; - Donald::Select::reader($clp_listen_socket,\&clp_connect_request); + My::Select::reader($clp_listen_socket,\&clp_connect_request); } sub clp_connect_request { - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); # listen socket ready @@ -1047,8 +1319,7 @@ sub clp_connect_request { my $buffer=''; - Donald::Select::reader($socket,\&clp_receive,$socket,\$buffer); - $clp_sockets{$socket}=$socket; + My::Select::reader($socket,sub{clp_receive($socket,\$buffer)}); } sub clp_receive { @@ -1056,7 +1327,6 @@ sub clp_receive { my $data; defined $s->recv($data,$TCP_MAX) or return; if (!length($data) ) { - delete $mgmt_sockets{$s}; $s->close; return; } @@ -1069,7 +1339,7 @@ sub clp_receive { $$bufref=substr($$bufref,2+$l); clp_message($s,$msg) and return; } - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); } sub clp_message { @@ -1098,6 +1368,17 @@ sub clp_rx_CMD { return 1; } +# send_tcp_cp($socket,$cb,$timeout,@args) +# +# send a cluster protocoll message over an async tcp socket. +# +# assume $CLUSTER_PW is valid +# +sub send_tcp_cp { + my ($s,$cb,$timeout,@args)=@_; + my $data=sign($CLUSTER_PW,encode(@args)); + My::Select::INET::send_tcp($s,pack('n',length($data)).$data,$timeout,$cb); +} #---------------------------------------------------------- #our $CLUSTER_PW; @@ -1142,7 +1423,7 @@ sub sync_cluster_pw { defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n"; $CLUSTER_PW=undef; } - Donald::Select::timeout(60,\&sync_cluster_pw); + My::Select::timeout(60,\&sync_cluster_pw); return defined $CLUSTER_PW; } @@ -1163,7 +1444,7 @@ sub init_area { warn "I am area router for $AREA_ROUTER{$my_hostname}\n"; $area_socket=new IO::Socket::INET (Proto=>'udp',LocalPort=>$UDP_PORT+1) or die "$!\n"; - Donald::Select::reader($area_socket,\&area_message,$area_socket); + My::Select::reader($area_socket,sub{area_message($area_socket)}); } sub area_message { @@ -1174,7 +1455,7 @@ sub area_message { my $udp_peer_addr=inet_ntoa($peer_iaddr); $donald_s->send_data($AREA_ROUTER{$my_hostname},$UDP_PORT,$data); # broadcast to our network - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); } sub udp_broadcast_message { @@ -1208,7 +1489,7 @@ sub check_progfile_status { else { $PROG_MTIME=$f[9]; } - Donald::Select::timeout(60,\&check_progfile_status); + My::Select::timeout(60,\&check_progfile_status); } sub version_info { # 'V1.31 - 20090617-155314' @@ -1243,7 +1524,7 @@ sub pload_debug { } sub check_overload() { - Donald::Select::timeout_requeue(600); + My::Select::timeout_requeue(600); if ($CPUS) { my $running_proc_10=running_proc_10(); my $pload_10=running_proc_10()/$CPUS; @@ -1251,14 +1532,6 @@ sub check_overload() { } } -sub ping_mlx { - Donald::Select::timeout_requeue(86400); - system 'ping','-c','1','mlx'; - if ($?) { - warn "failed to ping mlx\n"; - } -} - ##################################################### my $slave=0; @@ -1295,9 +1568,9 @@ sub exec_at { my $pbuffer=''; my $olbuffer=''; my $elbuffer=''; - Donald::Select::reader($s,\&cmd_rx,$host,$s,\$pbuffer,\$olbuffer,\$elbuffer); + My::Select::reader($s,sub{cmd_rx($host,$s,\$pbuffer,\$olbuffer,\$elbuffer)}); $slave=1; - Donald::Select::run() if $slave;; + My::Select::run() if $slave;; } sub lsof { @@ -1314,10 +1587,10 @@ sub lsof { clp_send_message($s,'LSOF',$pattern); my $pbuffer=''; my $olbuffer=''; - Donald::Select::reader($s,\&lsof_rx,$host,$s,\$pbuffer,\$olbuffer); + My::Select::reader($s,sub{lsof_rx($host,$s,\$pbuffer,\$olbuffer)}); $slave++; } - Donald::Select::run() if $slave;; + My::Select::run() if $slave;; } sub lsof_rx { @@ -1339,7 +1612,7 @@ sub lsof_rx { $$pbufref=substr($$pbufref,2+$l); lsof_msg($host,$s,$msg,$olbufref); } - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); } sub lsof_msg { @@ -1370,7 +1643,7 @@ sub cmd_rx { $$pbufref=substr($$pbufref,2+$l); cmd_msg($host,$s,$msg,$olbufref,$elbufref); } - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); } sub cmd_msg { @@ -1396,6 +1669,136 @@ sub cmd_msg { } } +our $SYS_SENDFILE=40; # /usr/include/asm/unistd_64.h + +sub sendfile { + my ($out_fd,$in_fd,$offset,$count)=@_; + my $ret=syscall($SYS_SENDFILE,fileno($out_fd),fileno($in_fd),$offset,$count); + return $ret<0 ? undef : $ret; +} + +sub clp_rx_PULL { + my ($s,$st_want)=@_; + + my $st_is=Donald::FileInfo->lstat($st_want->name); + if (!defined $st_is or $st_is->type ne 'F' or $st_is->size != $st_want->size or $st_is->mtime != $st_want->mtime) { + warn $st_want->name." requested by ".$s->peerhost.": no longer available\n"; + return; + } + my $fh; + unless (open $fh,'<',$st_want->name) { + warn $st_want->name.": $!\n"; + return; + } + my $bytes=$st_is->size; + my $cb_tmo=sub { My::Select::cancel_handle($s); }; + my $cb_write=sub { + my $l=sendfile($s,$fh,0,$bytes); + unless (defined $l) { + warn "$!"; + return; + } + $bytes-=$l; + if ($bytes) { + My::Select::writer_requeue if $bytes; + } else { + close($s); + } + }; + My::Select::timeout(5,$cb_tmo); + My::Select::writer($s,$cb_write); +} + +sub udp_rx_push { + my ($ip,$st_want)=@_; + + my $filename=$st_want->name; + my $tmp_filename="$filename.tmp"; + + $ip eq $my_ip and return; + + my $st_is=Donald::FileInfo->lstat($st_want->name); + + unless ($st_want->type eq 'F') { + warn "$filename: type ".$st_want->type." not yet implemented\n"; + return; + } + + if ($st_is + && $st_is->type eq 'F' + && $st_is->size == $st_want->size + && $st_is->mtime == $st_want->mtime + && $st_is->uid == $st_want->uid + && $st_is->gid == $st_want->gid + && $st_is->perm == $st_want->perm + ) { + warn "$filename: already okay\n"; + return; + } + + if ($st_want->size==0) { + my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0); + defined $fh or return warn "$tmp_filename: $!\n"; + $fh->close; + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + rename($tmp_filename,$filename); + utime($st_want->mtime,$st_want->mtime,$filename); + warn "installed (empty) $filename\n"; + return; + } + + my $s; + $s=My::Select::INET::connect_tcp($ip,$CLP_PORT,$TCP_TIMEOUT,sub { + $! and return warn "$ip: $!\n"; + send_tcp_cp($s,sub { + $! and return warn "$ip: $!\n"; + my $fh = IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0); + defined $fh or return warn "$tmp_filename: $!\n"; + + my $cb; + my $bytes=$st_want->size; + $cb=sub { + # note, we need to break the circular references $cb of our caller, if no longer needed + my ($buf)=@_; + if ($!) { warn "$ip: $!\n";$cb=undef;return; } + if (length($buf)==0) { warn "$ip: EOF\n";$cb=undef;return;} + print $fh $buf; + $bytes-=length($buf); + if ($bytes>0) { + My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); + return; + } + $cb=undef; + close $fh; + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + rename($tmp_filename,$filename) or return warn "$filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$filename); + warn "installed $filename\n"; + }; + My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); + },$TCP_TIMEOUT,'PULL',$st_want); + }); +} + +sub cmd_push { + my @files=@_; + for my $filename (@files) { + $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; + -e $filename or die "$filename: no such file\n"; + } + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + for my $filename (@files) { + my $st=Donald::FileInfo->lstat($filename); + defined $st or die "$filename: $!\n"; + $st->type eq 'F' or die "$filename: only plain files currently supported\n"; + open my $test,'<',$filename or die "$filename: $!\n"; + udp_broadcast_message($donald_s,'push',$my_ip,$st); + } +} + #------------------------------------------------------------ use constant USAGE => <<'__EOF__'; @@ -1419,6 +1822,8 @@ usage: $0 [options] --foreground # stay in foreground, log to stderr --syslog # log to syslog instead of stderr + push files.... # push files over tcp + __EOF__ use Getopt::Long; @@ -1440,7 +1845,7 @@ GetOptions if (defined $options{'push'}) { sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; push_file($donald_s,$options{'push'}); } elsif (defined $options{'exec'}) { if (substr($options{'exec'},0,1) eq '@') { @@ -1449,31 +1854,31 @@ if (defined $options{'push'}) { exec_at(substr($options{'exec'},1),@ARGV); } else { sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; send_exec($donald_s,$options{'exec'}); } } elsif (defined $options{'push_amd_tar'}) { sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; push_amd_tar($donald_s); } elsif (defined $options{'send-restart'}) { sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; udp_broadcast_message($donald_s,'restart'); } elsif (defined $options{'flush-gidcache'}) { sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; udp_broadcast_message($donald_s,'flush-gidcache'); } elsif (defined $options{'make-automaps'}) { sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; udp_broadcast_message($donald_s,'make-automaps'); } elsif (defined $options{'daemon'}) { $options{'kill'} and Donald::Tools::kill_previous_server('clusterd') and sleep 2; $SIG{PIPE}='IGNORE'; - $donald_s=new Donald::Select::INET(Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT) or die "$!\n"; + $donald_s=new My::Select::INET(Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT) or die "$!\n"; $donald_s->receive_data(\&udp_message,$donald_s); unless ($options{'foreground'}) { @@ -1500,19 +1905,25 @@ if (defined $options{'push'}) { sync_cluster_pw() or warn "$CLUSTER_PW_FILE: $!\n"; - Donald::Select::timeout(60,\&purge_old_receiver); - Donald::Select::timeout(rand(60),\&send_stat); - Donald::Select::timeout(0,\&sample_rproc) unless Donald::Tools::is_alpha; + My::Select::timeout(60,\&purge_old_receiver); + My::Select::timeout(rand(60),\&send_stat); + My::Select::timeout(0,\&sample_rproc) unless Donald::Tools::is_alpha; $my_hostname eq $STAT_TARGET and My::Cluster::Updown::init(); $my_hostname eq 'lol' and My::NetlogReceiver::init(); - Donald::Select::timeout(600,\&check_overload); - Donald::Select::timeout(30,\&ping_mlx); + My::Select::timeout(600,\&check_overload); - Donald::Select::run(); + My::Select::run(); } elsif ($options{'lsof'}) { lsof($options{'lsof'}); } elsif ($options{'kill'}) { Donald::Tools::kill_previous_server('clusterd'); } else { - die USAGE; + @ARGV or die USAGE; + my ($cmd,@args)=@ARGV; + if ($cmd eq 'push') { + @args>0 or die USAGE; + cmd_push(@args); + } else { + die USAGE; + } }