From d632ac4b55652c8219390854d0ea30c90730ac6f Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 18 Oct 2017 07:48:47 +0200 Subject: [PATCH 01/18] Clean up indentation mix --- clusterd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clusterd b/clusterd index 624ec61..3e439b2 100755 --- a/clusterd +++ b/clusterd @@ -591,7 +591,7 @@ sub udp_rx_filedata { if ($st_want->type eq 'L') { if (!$st_is || $st_is->type ne 'L' || $st_is->target ne $st_want->target) { - $st_is and (unlink($filename) or return warn "$filename: failed to unlink: $!\n"); + $st_is and (unlink($filename) or return warn "$filename: failed to unlink: $!\n"); symlink($st_want->target,$filename) or return warn "$filename: failed to create symlink: $!\n"; lchown($st_want->uid,$st_want->gid,$filename); lmtime($st_want->mtime,$filename); From c9085468c85ecd27f724f4e5d6f26239a3d8e47a Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 18 Oct 2017 14:00:32 +0200 Subject: [PATCH 02/18] Dereference CLP tcp socket after hangup We have a small leak here, that clp sockets keep referenced after the peer closed. Fix. --- clusterd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clusterd b/clusterd index 3e439b2..5a9c30e 100755 --- a/clusterd +++ b/clusterd @@ -1056,7 +1056,7 @@ sub clp_receive { my $data; defined $s->recv($data,$TCP_MAX) or return; if (!length($data) ) { - delete $mgmt_sockets{$s}; + delete $clp_sockets{$s}; $s->close; return; } From 0f589432c8911299c53663e5a3ad4833295a65a9 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 18 Oct 2017 15:04:08 +0200 Subject: [PATCH 03/18] Remove mlx ping --- clusterd | 9 --------- 1 file changed, 9 deletions(-) diff --git a/clusterd b/clusterd index 5a9c30e..e813978 100755 --- a/clusterd +++ b/clusterd @@ -1251,14 +1251,6 @@ sub check_overload() { } } -sub ping_mlx { - Donald::Select::timeout_requeue(86400); - system 'ping','-c','1','mlx'; - if ($?) { - warn "failed to ping mlx\n"; - } -} - ##################################################### my $slave=0; @@ -1506,7 +1498,6 @@ if (defined $options{'push'}) { $my_hostname eq $STAT_TARGET and My::Cluster::Updown::init(); $my_hostname eq 'lol' and My::NetlogReceiver::init(); Donald::Select::timeout(600,\&check_overload); - Donald::Select::timeout(30,\&ping_mlx); Donald::Select::run(); } elsif ($options{'lsof'}) { From 26bc5ad358a00afb30318a098ea1bafa9e2f7018 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Sat, 2 Dec 2017 11:12:43 +0100 Subject: [PATCH 04/18] Import Donald::Select into script The design of Donald::Select makes use of Donald::Callback objects, which are objects conainting a sub and call arguments. However, because of the pattern sub do_something_later() { my ($cb_or_sub,@args)=@_; $store_callback_somehere=new Donald::Callback($cb_or_sub,@args); } do_something_later(\&callback,$arg1,$arg2) the caller doesn't have a reference to the Donald::Callback object, which makes its diffucult to identify it, e.g. to cancel the callback. We want to change the design to accept only references to subs as callbacks. Instead of passing arguments, we exepect the caller to make use of closures to pass data to the callback if needed. sub do_something_later() { my ($cb)=@_; $store_callback_somewhere=$cb; } do_something_later(sub{callback($arg1,$args)}); Instead of changing the API of Donald::Select, we import the code directly into clusterd to make the modifications here. --- clusterd | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 210 insertions(+), 2 deletions(-) diff --git a/clusterd b/clusterd index e813978..8d865b4 100755 --- a/clusterd +++ b/clusterd @@ -10,12 +10,220 @@ our $REVISION='1.108'; #use lib ('/home/buczek/cluster/Donald/blib/lib'); use Donald::Tools qw(encode sign check_sign decode); -use Donald::Select; use Donald::FileInfo; -use Donald::Select::INET; use POSIX; use IO::Pipe; +#-------------------------------------- +package Donald::Select; + +use warnings; +use strict; + +our $VERSION = '1.00'; + +use Donald::Callback 1.01 ; +use Donald::Tools; + +our $time=Donald::Tools::uptime(); + +sub Donald::Select::time +{ + return $time; +} + +#----------------------------------------- + +our @TIMER=(); # ( [duetime,cb] , ) sorted by time +our $active_timer_cb; + +sub timeout # cb=Select::timeout(seconds,cb) or cb=Select::Timeout(seconds,subref [,args,...]) +{ + my ($delta,$cb_or_sub,@args)=@_; + my $cb=new Donald::Callback($cb_or_sub,@args); + + my $duetime=$time+$delta; + @TIMER=sort {$a->[0] <=> $b->[0]} ( [$duetime,$cb] , @TIMER ); + return $cb; +} + +sub timeout_cancel # cb=Select::timeout_cancel(cb) +{ + my ($cb)=@_; + @TIMER=grep {$_->[1] != $cb && $_->[1]->[0] != $cb} @TIMER; + return $cb; +} + +sub timeout_requeue {timeout(shift,$active_timer_cb); } # Select::timeout_requeue(seconds) + +#------------------------------------ + +our @READER; # ( [Handle,cb] , ... ) +our @WRITER; +our @EXCEPT; +our $active_io; # [Handle,cb] + +sub reader # cb = Select::reader(Handle,cb) +{ + my ($handle,$cb_or_sub,@args)=@_; + my $cb=new Donald::Callback($cb_or_sub,@args); + push @READER,[$handle,$cb]; + return $cb; +} + +sub writer # cb = Select::writer(Handle,cb) +{ + my ($handle,$cb_or_sub,@args)=@_; + my $cb=new Donald::Callback($cb_or_sub,@args); + push @WRITER,[$handle,$cb]; + return $cb; +} + +sub except # cb = Select::except(Handle,cb) +{ + my ($handle,$cb_or_sub,@args)=@_; + my $cb=new Donald::Callback($cb_or_sub,@args); + push @EXCEPT,[$handle,$cb]; + return $cb; +} + +sub reader_requeue {push @READER,$active_io} +sub writer_requeue {push @WRITER,$active_io} +sub except_requeue {push @EXCEPT,$active_io} + +sub cancel # $cb = Select::cancel([cb]) +{ + my ($cb)=@_; + defined $cb or $cb=$active_io->[1]; + @READER=grep {$_->[1] != $cb} @READER; + @WRITER=grep {$_->[1] != $cb} @WRITER; + @EXCEPT=grep {$_->[1] != $cb} @EXCEPT; +} + +sub cancel_handle +{ + my ($handle)=@_; + @READER=grep {$_->[0] != $handle} @READER; + @WRITER=grep {$_->[0] != $handle} @WRITER; + @EXCEPT=grep {$_->[0] != $handle} @EXCEPT; +} + +#-------------------------------------------- + +sub heartbeat +{ + $time++; + while (@TIMER && $TIMER[0]->[0]<=$time) { + $active_timer_cb=(shift @TIMER)->[1]; + $active_timer_cb->call(); + } + $active_timer_cb=undef; +} + +sub run +{ + while (1) { + $time=Donald::Tools::uptime(); + while (@TIMER && $TIMER[0]->[0]<=$time) { + $active_timer_cb=(shift @TIMER)->[1]; + $active_timer_cb->call(); + } + $active_timer_cb=undef; + + my ($rvec,$wvec,$evec)=('','',''); + + for (@READER) { vec($rvec,$_->[0]->fileno,1)=1 } ; + for (@WRITER) { vec($wvec,$_->[0]->fileno,1)=1 } ; + for (@EXCEPT) { vec($evec,$_->[0]->fileno,1)=1 } ; + + my $ready=select($rvec,$wvec,$evec,1); + if ($ready>0) { + for (my $i=0;$i<@READER;$i++) { + if (vec($rvec,$READER[$i]->[0]->fileno,1)) { + $active_io=splice @READER,$i,1; + $active_io->[1]->call(); + $active_io=undef; + last; + } + } + for (my $i=0;$i<@WRITER;$i++) { + if (vec($wvec,$WRITER[$i]->[0]->fileno,1)) { + $active_io=splice @WRITER,$i,1; + $active_io->[1]->call(); + $active_io=undef; + last; + } + } + for (my $i=0;$i<@EXCEPT;$i++) { + if (vec($evec,$EXCEPT[$i]->[0]->fileno,1)) { + $active_io=splice @EXCEPT,$i,1; + $active_io->[1]->call(); + $active_io=undef; + last; + } + } + } + } +} + +1; +#-------------------------------------- +package Donald::Select::INET ; + +use warnings; +use strict; + +use Carp; +use IO::Socket::INET; +use Digest::MD5; +use Storable; + +our $VERSION = '1.00'; + +our $UDP_MAX=1472; # for broadcast on alphas + + +sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) +{ + my ($class,@args)=@_; + our $socket=new IO::Socket::INET (@args) or return undef; + return bless \$socket,$class; +} + + +sub send_data +{ + my ($self,$ip,$port,$data)=@_; + my $ip_address=inet_aton($ip); + unless (defined $ip_address) {carp("can't resolve $ip\n");return undef} + unless (length($data)<=$UDP_MAX) {carp("message to long\n");return undef} + $$self->send($data,0,pack_sockaddr_in($port,$ip_address)) or carp "$!\n"; +} + + +sub reader +{ + my ($self,$sub,@args)=@_; + Donald::Select::reader($$self,$sub,@args); +} + +sub receive_data +{ + my ($self,$sub,@args)=@_; + Donald::Select::reader($$self,\&receive_data_cb,$self,$sub,@args); +} + +sub receive_data_cb +{ + my ($self,$sub,@args)=@_; + my $data; + my $peer = $$self->recv($data,$UDP_MAX); + my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); + my $udp_peer_addr=inet_ntoa($peer_iaddr); + Donald::Select::reader_requeue(); + $sub->($data,$udp_peer_addr,$udp_peer_port,@args); +} + #-------------------------------------- package My::Cluster::Updown; From 35aea6f59501002542b85b7e792930dc5f1ef2d6 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Sat, 2 Dec 2017 11:28:18 +0100 Subject: [PATCH 05/18] Rename Donald::Select to My::Select Use the namespace My:: for packages declared inside the main script. --- clusterd | 112 +++++++++++++++++++++++++++---------------------------- 1 file changed, 56 insertions(+), 56 deletions(-) diff --git a/clusterd b/clusterd index 8d865b4..150f802 100755 --- a/clusterd +++ b/clusterd @@ -15,7 +15,7 @@ use POSIX; use IO::Pipe; #-------------------------------------- -package Donald::Select; +package My::Select; use warnings; use strict; @@ -27,7 +27,7 @@ use Donald::Tools; our $time=Donald::Tools::uptime(); -sub Donald::Select::time +sub My::Select::time { return $time; } @@ -168,7 +168,7 @@ sub run 1; #-------------------------------------- -package Donald::Select::INET ; +package My::Select::INET ; use warnings; use strict; @@ -204,13 +204,13 @@ sub send_data sub reader { my ($self,$sub,@args)=@_; - Donald::Select::reader($$self,$sub,@args); + My::Select::reader($$self,$sub,@args); } sub receive_data { my ($self,$sub,@args)=@_; - Donald::Select::reader($$self,\&receive_data_cb,$self,$sub,@args); + My::Select::reader($$self,\&receive_data_cb,$self,$sub,@args); } sub receive_data_cb @@ -220,7 +220,7 @@ sub receive_data_cb my $peer = $$self->recv($data,$UDP_MAX); my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); my $udp_peer_addr=inet_ntoa($peer_iaddr); - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); $sub->($data,$udp_peer_addr,$udp_peer_port,@args); } @@ -312,13 +312,13 @@ sub msg_state { sub init { restore_state; msg_text('node monitor: started. (recovery mode)'); - Donald::Select::timeout(630,sub{$MONITOR_STARTING=0;msg_text('node monitor: recovery finished');}); - Donald::Select::timeout(630,\&timeout_hosts); + My::Select::timeout(630,sub{$MONITOR_STARTING=0;msg_text('node monitor: recovery finished');}); + My::Select::timeout(630,\&timeout_hosts); } sub timeout_hosts { - Donald::Select::timeout_requeue(60); - my $timeout=Donald::Select::time()-1230; # 2x10 minutes + 30 seconds + My::Select::timeout_requeue(60); + my $timeout=My::Select::time()-1230; # 2x10 minutes + 30 seconds for (keys %H) { my $h=$H{$_}; @@ -343,7 +343,7 @@ sub rx_hostannounce { } else { $MONITOR_STARTING or msg_state('NEW','UP',$host,"discovered running node $version - $unixrev"); } - $H{$host}=['UP',Donald::Select::time,$seq+1,@more]; + $H{$host}=['UP',My::Select::time,$seq+1,@more]; } else { my $h=$H{$host}; if ($h->[0] eq 'UP') { @@ -371,7 +371,7 @@ sub rx_hostannounce { msg_state('DOWN','UP',$host,$seq-$h->[2]." packet(s) lost"); } } - @$h=('UP',Donald::Select::time,$seq+1,@more); + @$h=('UP',My::Select::time,$seq+1,@more); } } @@ -404,12 +404,12 @@ sub bigben { sub bigben_timer { bigben(); - Donald::Select::timeout_requeue(30); + My::Select::timeout_requeue(30); } sub bigben_init { $DAY_LAST_MSG=day(); - Donald::Select::timeout(30,\&bigben_timer); + My::Select::timeout(30,\&bigben_timer); } @@ -435,24 +435,24 @@ sub receive { $|=1; warn "NETLOG $msg\n" unless $msg=~/NETLOG/; } - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); } sub connect_request { - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); my $socket=$listen_socket->accept(); $socket->blocking(0); my $peernode=$socket->peerhost; my $buffer=''; - Donald::Select::reader($socket,\&receive,$socket,$peernode,\$buffer); + My::Select::reader($socket,\&receive,$socket,$peernode,\$buffer); # warn "$peernode: connect\n"; } sub init { $listen_socket=new IO::Socket::INET(Proto=>'tcp',LocalPort=>1028,Listen=>1,ReuseAddr=>1) or die "$!\n"; - Donald::Select::reader($listen_socket,\&connect_request); + My::Select::reader($listen_socket,\&connect_request); bigben_init(); } @@ -470,7 +470,7 @@ our $BC_RATE=8; # packets per second broadcast our (%options); # RUN OPTIONS -our $donald_s; # Donald::Select::INET udp socket +our $donald_s; # My::Select::INET udp socket our $my_hostname; our $my_ip; # '141.14.12.12' @@ -686,13 +686,13 @@ my %RECEIVER; # ( filename => $receiver, .... ) sub purge_old_receiver { while (my ($n,$v)=each %RECEIVER) { - if ($v->[1]+10[1]+10[0]->name,"\n"; log_to_stat_target('timeout receiving ',$v->[0]->name); delete $RECEIVER{$n}; } } - Donald::Select::timeout_requeue(60); + My::Select::timeout_requeue(60); } #------------------------------------------------------------- @@ -847,7 +847,7 @@ sub udp_rx_filedata { -e $tmp_filename and unlink($tmp_filename); my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0); defined $fh or return warn "$tmp_filename: $!\n"; - $receiver = [$st_want,Donald::Select::time,$fh,[]]; + $receiver = [$st_want,My::Select::time,$fh,[]]; $RECEIVER{$filename}=$receiver; } @@ -857,7 +857,7 @@ sub udp_rx_filedata { # write data ( size cant be 0 here ) $receiver->[2]->seek($pos,0) or return warn "$tmp_filename: $!\n"; $receiver->[2]->syswrite($data) or return warn "$tmp_filename: $!\n";; - $receiver->[1]=Donald::Select::time; + $receiver->[1]=My::Select::time; my $s=$receiver->[3]=[normalize_seg(@{$receiver->[3]},[$pos,length($data)])]; #warn "$filename: receive $pos length ",length($data)," segments now: ",join(",",map( {'['.$_->[0].','.$_->[1].']'} @{$receiver->[3]})),"\n"; @@ -922,7 +922,7 @@ sub sample_rproc { # every 5 seconds @proc_running_10=(@proc_running_10[1..@proc_running_10-1],running_proc()); $SAMPLE_TICK=0; } - Donald::Select::timeout_requeue(5); + My::Select::timeout_requeue(5); } #----------------------------------------------------------- stat @@ -963,7 +963,7 @@ sub loadavg { # AXP : (system load average) , LINUX: (system load average, pl our $STAT_TARGET='lol'; our $STAT_SEQ=0; sub send_stat { - Donald::Select::timeout_requeue(600); + My::Select::timeout_requeue(600); my ($load_avg,$pload,$pcapacity)=loadavg(); defined $load_avg or return; udp_send_message($STAT_TARGET,'loadavg.2',$my_hostname,$STAT_SEQ++,$load_avg,version_info(),$pload,$pcapacity,$my_unixrev); @@ -1025,12 +1025,12 @@ our %mgmt_sockets; sub mgmt_init { $mgmt_listen_socket=new IO::Socket::INET(LocalPort=>$MGMT_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1); defined $mgmt_listen_socket or die "$!\n"; - Donald::Select::reader($mgmt_listen_socket,\&mgmt_connect_request); + My::Select::reader($mgmt_listen_socket,\&mgmt_connect_request); } sub mgmt_connect_request { - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); # listen socket ready @@ -1040,7 +1040,7 @@ sub mgmt_connect_request { ### warn "accepted mgmt connection from $peernode\n"; - Donald::Select::reader($socket,\&mgmt_receive,$socket); + My::Select::reader($socket,\&mgmt_receive,$socket); $mgmt_sockets{$socket}=$socket; $socket->print("clusterd ".version_info()." stupid console\n"); $socket->print("For historical messages, grep \"clusterd\" from /var/log/messages on $STAT_TARGET (or \"tail -f /var/log/messages |grep cluster\")\n"); @@ -1057,7 +1057,7 @@ sub mgmt_receive { return; } - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); $data=~s/\r?\n$//; length $data or return; if ($data eq 'l') { @@ -1240,12 +1240,12 @@ our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF); sub clp_init { $clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1); defined $clp_listen_socket or die "$!\n"; - Donald::Select::reader($clp_listen_socket,\&clp_connect_request); + My::Select::reader($clp_listen_socket,\&clp_connect_request); } sub clp_connect_request { - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); # listen socket ready @@ -1255,7 +1255,7 @@ sub clp_connect_request { my $buffer=''; - Donald::Select::reader($socket,\&clp_receive,$socket,\$buffer); + My::Select::reader($socket,\&clp_receive,$socket,\$buffer); $clp_sockets{$socket}=$socket; } @@ -1277,7 +1277,7 @@ sub clp_receive { $$bufref=substr($$bufref,2+$l); clp_message($s,$msg) and return; } - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); } sub clp_message { @@ -1350,7 +1350,7 @@ sub sync_cluster_pw { defined $CLUSTER_PW and warn "$CLUSTER_PW_FILE: $!\n"; $CLUSTER_PW=undef; } - Donald::Select::timeout(60,\&sync_cluster_pw); + My::Select::timeout(60,\&sync_cluster_pw); return defined $CLUSTER_PW; } @@ -1371,7 +1371,7 @@ sub init_area { warn "I am area router for $AREA_ROUTER{$my_hostname}\n"; $area_socket=new IO::Socket::INET (Proto=>'udp',LocalPort=>$UDP_PORT+1) or die "$!\n"; - Donald::Select::reader($area_socket,\&area_message,$area_socket); + My::Select::reader($area_socket,\&area_message,$area_socket); } sub area_message { @@ -1382,7 +1382,7 @@ sub area_message { my $udp_peer_addr=inet_ntoa($peer_iaddr); $donald_s->send_data($AREA_ROUTER{$my_hostname},$UDP_PORT,$data); # broadcast to our network - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); } sub udp_broadcast_message { @@ -1416,7 +1416,7 @@ sub check_progfile_status { else { $PROG_MTIME=$f[9]; } - Donald::Select::timeout(60,\&check_progfile_status); + My::Select::timeout(60,\&check_progfile_status); } sub version_info { # 'V1.31 - 20090617-155314' @@ -1451,7 +1451,7 @@ sub pload_debug { } sub check_overload() { - Donald::Select::timeout_requeue(600); + My::Select::timeout_requeue(600); if ($CPUS) { my $running_proc_10=running_proc_10(); my $pload_10=running_proc_10()/$CPUS; @@ -1495,9 +1495,9 @@ sub exec_at { my $pbuffer=''; my $olbuffer=''; my $elbuffer=''; - Donald::Select::reader($s,\&cmd_rx,$host,$s,\$pbuffer,\$olbuffer,\$elbuffer); + My::Select::reader($s,\&cmd_rx,$host,$s,\$pbuffer,\$olbuffer,\$elbuffer); $slave=1; - Donald::Select::run() if $slave;; + My::Select::run() if $slave;; } sub lsof { @@ -1514,10 +1514,10 @@ sub lsof { clp_send_message($s,'LSOF',$pattern); my $pbuffer=''; my $olbuffer=''; - Donald::Select::reader($s,\&lsof_rx,$host,$s,\$pbuffer,\$olbuffer); + My::Select::reader($s,\&lsof_rx,$host,$s,\$pbuffer,\$olbuffer); $slave++; } - Donald::Select::run() if $slave;; + My::Select::run() if $slave;; } sub lsof_rx { @@ -1539,7 +1539,7 @@ sub lsof_rx { $$pbufref=substr($$pbufref,2+$l); lsof_msg($host,$s,$msg,$olbufref); } - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); } sub lsof_msg { @@ -1570,7 +1570,7 @@ sub cmd_rx { $$pbufref=substr($$pbufref,2+$l); cmd_msg($host,$s,$msg,$olbufref,$elbufref); } - Donald::Select::reader_requeue(); + My::Select::reader_requeue(); } sub cmd_msg { @@ -1640,7 +1640,7 @@ GetOptions if (defined $options{'push'}) { sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; push_file($donald_s,$options{'push'}); } elsif (defined $options{'exec'}) { if (substr($options{'exec'},0,1) eq '@') { @@ -1649,31 +1649,31 @@ if (defined $options{'push'}) { exec_at(substr($options{'exec'},1),@ARGV); } else { sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; send_exec($donald_s,$options{'exec'}); } } elsif (defined $options{'push_amd_tar'}) { sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; push_amd_tar($donald_s); } elsif (defined $options{'send-restart'}) { sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; udp_broadcast_message($donald_s,'restart'); } elsif (defined $options{'flush-gidcache'}) { sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; udp_broadcast_message($donald_s,'flush-gidcache'); } elsif (defined $options{'make-automaps'}) { sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new Donald::Select::INET(Proto=>'udp') or die "$!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; udp_broadcast_message($donald_s,'make-automaps'); } elsif (defined $options{'daemon'}) { $options{'kill'} and Donald::Tools::kill_previous_server('clusterd') and sleep 2; $SIG{PIPE}='IGNORE'; - $donald_s=new Donald::Select::INET(Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT) or die "$!\n"; + $donald_s=new My::Select::INET(Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT) or die "$!\n"; $donald_s->receive_data(\&udp_message,$donald_s); unless ($options{'foreground'}) { @@ -1700,14 +1700,14 @@ if (defined $options{'push'}) { sync_cluster_pw() or warn "$CLUSTER_PW_FILE: $!\n"; - Donald::Select::timeout(60,\&purge_old_receiver); - Donald::Select::timeout(rand(60),\&send_stat); - Donald::Select::timeout(0,\&sample_rproc) unless Donald::Tools::is_alpha; + My::Select::timeout(60,\&purge_old_receiver); + My::Select::timeout(rand(60),\&send_stat); + My::Select::timeout(0,\&sample_rproc) unless Donald::Tools::is_alpha; $my_hostname eq $STAT_TARGET and My::Cluster::Updown::init(); $my_hostname eq 'lol' and My::NetlogReceiver::init(); - Donald::Select::timeout(600,\&check_overload); + My::Select::timeout(600,\&check_overload); - Donald::Select::run(); + My::Select::run(); } elsif ($options{'lsof'}) { lsof($options{'lsof'}); } elsif ($options{'kill'}) { From 23ab0c940fc0ec62d014617ce80ae72fbe57a8bc Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Sat, 2 Dec 2017 11:36:21 +0100 Subject: [PATCH 06/18] Remove declarations no longer needeid after import We imported some lines which do no longer have a function now. Remove them. --- clusterd | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/clusterd b/clusterd index 150f802..01a8acc 100755 --- a/clusterd +++ b/clusterd @@ -17,11 +17,6 @@ use IO::Pipe; #-------------------------------------- package My::Select; -use warnings; -use strict; - -our $VERSION = '1.00'; - use Donald::Callback 1.01 ; use Donald::Tools; @@ -166,20 +161,14 @@ sub run } } -1; #-------------------------------------- package My::Select::INET ; -use warnings; -use strict; - use Carp; use IO::Socket::INET; use Digest::MD5; use Storable; -our $VERSION = '1.00'; - our $UDP_MAX=1472; # for broadcast on alphas From 20de3563dfb78c66cb6c2cd94382185d93ec158a Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Sat, 2 Dec 2017 11:38:05 +0100 Subject: [PATCH 07/18] Remove unused sub heartbeat. --- clusterd | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/clusterd b/clusterd index 01a8acc..3f5fb5e 100755 --- a/clusterd +++ b/clusterd @@ -105,16 +105,6 @@ sub cancel_handle #-------------------------------------------- -sub heartbeat -{ - $time++; - while (@TIMER && $TIMER[0]->[0]<=$time) { - $active_timer_cb=(shift @TIMER)->[1]; - $active_timer_cb->call(); - } - $active_timer_cb=undef; -} - sub run { while (1) { From 7ed96d61f0319c966c58e18571028def0a6eec04 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Sat, 2 Dec 2017 11:44:20 +0100 Subject: [PATCH 08/18] Remove/fix some comments Non-functional change to bring source into sync with another working branch. --- clusterd | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/clusterd b/clusterd index 3f5fb5e..fd857e1 100755 --- a/clusterd +++ b/clusterd @@ -27,9 +27,7 @@ sub My::Select::time return $time; } -#----------------------------------------- - -our @TIMER=(); # ( [duetime,cb] , ) sorted by time +our @TIMER=(); # ( [duetime,cb] , ... ) sorted by time our $active_timer_cb; sub timeout # cb=Select::timeout(seconds,cb) or cb=Select::Timeout(seconds,subref [,args,...]) @@ -86,7 +84,7 @@ sub reader_requeue {push @READER,$active_io} sub writer_requeue {push @WRITER,$active_io} sub except_requeue {push @EXCEPT,$active_io} -sub cancel # $cb = Select::cancel([cb]) +sub cancel # $cb = Select::cancel(cb) { my ($cb)=@_; defined $cb or $cb=$active_io->[1]; @@ -103,8 +101,6 @@ sub cancel_handle @EXCEPT=grep {$_->[0] != $handle} @EXCEPT; } -#-------------------------------------------- - sub run { while (1) { From 3901aa7dea2ac2749fd3a8457d33548a414cf3ff Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Sat, 2 Dec 2017 12:44:19 +0100 Subject: [PATCH 09/18] Remove My::Callback Use refenerces to subs as callback arguments. If the caller wants to pass additional arguments, he can use closures. --- clusterd | 67 ++++++++++++++++++++++++-------------------------------- 1 file changed, 29 insertions(+), 38 deletions(-) diff --git a/clusterd b/clusterd index fd857e1..5d15339 100755 --- a/clusterd +++ b/clusterd @@ -14,10 +14,9 @@ use Donald::FileInfo; use POSIX; use IO::Pipe; -#-------------------------------------- +#------------------------------------- package My::Select; -use Donald::Callback 1.01 ; use Donald::Tools; our $time=Donald::Tools::uptime(); @@ -30,11 +29,9 @@ sub My::Select::time our @TIMER=(); # ( [duetime,cb] , ... ) sorted by time our $active_timer_cb; -sub timeout # cb=Select::timeout(seconds,cb) or cb=Select::Timeout(seconds,subref [,args,...]) +sub timeout # cb=Select::timeout(seconds,cb) { - my ($delta,$cb_or_sub,@args)=@_; - my $cb=new Donald::Callback($cb_or_sub,@args); - + my ($delta,$cb)=@_; my $duetime=$time+$delta; @TIMER=sort {$a->[0] <=> $b->[0]} ( [$duetime,$cb] , @TIMER ); return $cb; @@ -43,7 +40,7 @@ sub timeout # cb=Select::timeout(seconds,cb) or cb=Select::Timeout(seconds,sub sub timeout_cancel # cb=Select::timeout_cancel(cb) { my ($cb)=@_; - @TIMER=grep {$_->[1] != $cb && $_->[1]->[0] != $cb} @TIMER; + @TIMER=grep {$_->[1] != $cb} @TIMER; return $cb; } @@ -58,24 +55,21 @@ our $active_io; # [Handle,cb] sub reader # cb = Select::reader(Handle,cb) { - my ($handle,$cb_or_sub,@args)=@_; - my $cb=new Donald::Callback($cb_or_sub,@args); + my ($handle,$cb)=@_; push @READER,[$handle,$cb]; return $cb; } sub writer # cb = Select::writer(Handle,cb) { - my ($handle,$cb_or_sub,@args)=@_; - my $cb=new Donald::Callback($cb_or_sub,@args); + my ($handle,$cb)=@_; push @WRITER,[$handle,$cb]; return $cb; } sub except # cb = Select::except(Handle,cb) { - my ($handle,$cb_or_sub,@args)=@_; - my $cb=new Donald::Callback($cb_or_sub,@args); + my ($handle,$cb)=@_; push @EXCEPT,[$handle,$cb]; return $cb; } @@ -107,7 +101,7 @@ sub run $time=Donald::Tools::uptime(); while (@TIMER && $TIMER[0]->[0]<=$time) { $active_timer_cb=(shift @TIMER)->[1]; - $active_timer_cb->call(); + $active_timer_cb->(); } $active_timer_cb=undef; @@ -122,7 +116,7 @@ sub run for (my $i=0;$i<@READER;$i++) { if (vec($rvec,$READER[$i]->[0]->fileno,1)) { $active_io=splice @READER,$i,1; - $active_io->[1]->call(); + $active_io->[1]->(); $active_io=undef; last; } @@ -130,7 +124,7 @@ sub run for (my $i=0;$i<@WRITER;$i++) { if (vec($wvec,$WRITER[$i]->[0]->fileno,1)) { $active_io=splice @WRITER,$i,1; - $active_io->[1]->call(); + $active_io->[1]->(); $active_io=undef; last; } @@ -138,7 +132,7 @@ sub run for (my $i=0;$i<@EXCEPT;$i++) { if (vec($evec,$EXCEPT[$i]->[0]->fileno,1)) { $active_io=splice @EXCEPT,$i,1; - $active_io->[1]->call(); + $active_io->[1]->(); $active_io=undef; last; } @@ -157,7 +151,6 @@ use Storable; our $UDP_MAX=1472; # for broadcast on alphas - sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) { my ($class,@args)=@_; @@ -178,25 +171,23 @@ sub send_data sub reader { - my ($self,$sub,@args)=@_; - My::Select::reader($$self,$sub,@args); + my ($self,$sub)=@_; + My::Select::reader($$self,$sub); } sub receive_data { - my ($self,$sub,@args)=@_; - My::Select::reader($$self,\&receive_data_cb,$self,$sub,@args); -} + my ($self,$cb,@args)=@_; -sub receive_data_cb -{ - my ($self,$sub,@args)=@_; - my $data; - my $peer = $$self->recv($data,$UDP_MAX); - my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); - my $udp_peer_addr=inet_ntoa($peer_iaddr); - My::Select::reader_requeue(); - $sub->($data,$udp_peer_addr,$udp_peer_port,@args); + my $receive_data_cb=sub { + my $data; + my $peer = $$self->recv($data,$UDP_MAX); + my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); + my $udp_peer_addr=inet_ntoa($peer_iaddr); + My::Select::reader_requeue(); + $cb->($data,$udp_peer_addr,$udp_peer_port); + }; + My::Select::reader($$self,$receive_data_cb); } #-------------------------------------- @@ -421,7 +412,7 @@ sub connect_request { my $peernode=$socket->peerhost; my $buffer=''; - My::Select::reader($socket,\&receive,$socket,$peernode,\$buffer); + My::Select::reader($socket,sub{receive($socket,$peernode,\$buffer)}); # warn "$peernode: connect\n"; } @@ -1015,7 +1006,7 @@ sub mgmt_connect_request { ### warn "accepted mgmt connection from $peernode\n"; - My::Select::reader($socket,\&mgmt_receive,$socket); + My::Select::reader($socket,sub{mgmt_receive($socket)}); $mgmt_sockets{$socket}=$socket; $socket->print("clusterd ".version_info()." stupid console\n"); $socket->print("For historical messages, grep \"clusterd\" from /var/log/messages on $STAT_TARGET (or \"tail -f /var/log/messages |grep cluster\")\n"); @@ -1230,7 +1221,7 @@ sub clp_connect_request { my $buffer=''; - My::Select::reader($socket,\&clp_receive,$socket,\$buffer); + My::Select::reader($socket,sub{clp_receive($socket,\$buffer)}); $clp_sockets{$socket}=$socket; } @@ -1346,7 +1337,7 @@ sub init_area { warn "I am area router for $AREA_ROUTER{$my_hostname}\n"; $area_socket=new IO::Socket::INET (Proto=>'udp',LocalPort=>$UDP_PORT+1) or die "$!\n"; - My::Select::reader($area_socket,\&area_message,$area_socket); + My::Select::reader($area_socket,sub{area_message($area_socket)}); } sub area_message { @@ -1470,7 +1461,7 @@ sub exec_at { my $pbuffer=''; my $olbuffer=''; my $elbuffer=''; - My::Select::reader($s,\&cmd_rx,$host,$s,\$pbuffer,\$olbuffer,\$elbuffer); + My::Select::reader($s,sub{cmd_rx($host,$s,\$pbuffer,\$olbuffer,\$elbuffer)}); $slave=1; My::Select::run() if $slave;; } @@ -1489,7 +1480,7 @@ sub lsof { clp_send_message($s,'LSOF',$pattern); my $pbuffer=''; my $olbuffer=''; - My::Select::reader($s,\&lsof_rx,$host,$s,\$pbuffer,\$olbuffer); + My::Select::reader($s,sub{lsof_rx($host,$s,\$pbuffer,\$olbuffer)}); $slave++; } My::Select::run() if $slave;; From fa4a96e6747af296598258c1a6631f58267cb323 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Sat, 2 Dec 2017 12:55:50 +0100 Subject: [PATCH 10/18] Remove/Move imports --- clusterd | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/clusterd b/clusterd index 5d15339..0689eef 100755 --- a/clusterd +++ b/clusterd @@ -13,12 +13,11 @@ use Donald::Tools qw(encode sign check_sign decode); use Donald::FileInfo; use POSIX; use IO::Pipe; +use Digest::MD5; #------------------------------------- package My::Select; -use Donald::Tools; - our $time=Donald::Tools::uptime(); sub My::Select::time @@ -146,8 +145,6 @@ package My::Select::INET ; use Carp; use IO::Socket::INET; -use Digest::MD5; -use Storable; our $UDP_MAX=1472; # for broadcast on alphas From e61ecdaae3065fbbb7fd7dabed703e32f7613578 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 6 Dec 2017 16:57:27 +0100 Subject: [PATCH 11/18] Remove clp_sockets hash This hash is not used but it prevents these sockets to be able to be closed by running out of scope. --- clusterd | 4 ---- 1 file changed, 4 deletions(-) diff --git a/clusterd b/clusterd index 0689eef..06989b6 100755 --- a/clusterd +++ b/clusterd @@ -1196,8 +1196,6 @@ sub run_cmd { our $CLP_PORT=235; our $clp_listen_socket; -our %clp_sockets; - our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF); sub clp_init { @@ -1219,7 +1217,6 @@ sub clp_connect_request { my $buffer=''; My::Select::reader($socket,sub{clp_receive($socket,\$buffer)}); - $clp_sockets{$socket}=$socket; } sub clp_receive { @@ -1227,7 +1224,6 @@ sub clp_receive { my $data; defined $s->recv($data,$TCP_MAX) or return; if (!length($data) ) { - delete $clp_sockets{$s}; $s->close; return; } From 01e9dc44124e5502af271cfab4678b6697a5f1c0 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 6 Dec 2017 17:11:15 +0100 Subject: [PATCH 12/18] Add functions for nonblocking tcp to My::Select::INET --- clusterd | 101 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/clusterd b/clusterd index 06989b6..8ee7aea 100755 --- a/clusterd +++ b/clusterd @@ -148,6 +148,15 @@ use IO::Socket::INET; our $UDP_MAX=1472; # for broadcast on alphas +our $SOL_SOCKET=1; +our $SO_ERROR=4; + +sub get_socket_error +{ + my ($s)=@_; + return unpack('i',getsockopt($s,$SOL_SOCKET,$SO_ERROR)); +} + sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) { my ($class,@args)=@_; @@ -187,6 +196,98 @@ sub receive_data My::Select::reader($$self,$receive_data_cb); } +# send_tcp($socket,$data,$timeout,$cb); +# +# send data asynchronously over noblocking tcp socket +# call callback when done ($!=0) or on error ($! set) +# +# all arguments required +# +sub send_tcp { + my ($s,$data,$timeout,$cb)=@_; + my $len=$s->send($data,0); + defined $len or return $cb->(); + if ($len==length($data)) { + $!=0; + $cb->(); + return; + } + my $pos=$len; + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(); + }; + my $cb_write=sub { + My::Select::timeout_cancel($cb_tmo); + my $len=send($s,substr($data,$pos),0); + defined $len or return $cb->(); + if ($len==length($data)-$pos) { + $!=0; + $cb->(); + return; + } + $pos+=$len; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer_requeue(); + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer($s,$cb_write); +} + +# $socket = connect_tcp ($ip,$port,$timeout,$cb) +# +# asynchronously connect to tcp socket. +# call callback when done or on error (with $! set) +# +# all arguments required +# +sub connect_tcp { + my ($ip,$port,$timeout,$cb)=@_; + + my $s=new IO::Socket::INET (PeerAddr=>$ip,PeerPort=>$port,Blocking=>0); + defined $s or return $cb->(); + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(); + }; + my $cb_write=sub { + My::Select::timeout_cancel($cb_tmo); + $!=get_socket_error($s); + $cb->(); + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer($s,$cb_write); + return $s; +} + +# read_with_timeout($socket,$callback,$timeout) +# +# asynchronously read from tcp socket. +# +sub read_with_timeout { + my ($s,$cb,$timeout)=@_; + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(undef); + }; + my $cb_read=sub { + My::Select::timeout_cancel($cb_tmo); + my $buf=''; + my $l=sysread($s,$buf,102400,0); + if (!defined $l) { + $cb->(undef); + } else { + $!=0; + $cb->($buf); + } + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::reader($s,$cb_read) +} + #-------------------------------------- package My::Cluster::Updown; From 676ca6d7a657c8996fcfa4633a0d3cbf81f59e4d Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 6 Dec 2017 17:13:59 +0100 Subject: [PATCH 13/18] Add send_tcp_cp() to send a cluster protocol message over tcp --- clusterd | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/clusterd b/clusterd index 8ee7aea..fcefac3 100755 --- a/clusterd +++ b/clusterd @@ -1366,6 +1366,17 @@ sub clp_rx_CMD { return 1; } +# send_tcp_cp($socket,$cb,$timeout,@args) +# +# send a cluster protocoll message over an async tcp socket. +# +# assume $CLUSTER_PW is valid +# +sub send_tcp_cp { + my ($s,$cb,$timeout,@args)=@_; + my $data=sign($CLUSTER_PW,encode(@args)); + My::Select::INET::send_tcp($s,pack('n',length($data)).$data,$timeout,$cb); +} #---------------------------------------------------------- #our $CLUSTER_PW; From 305be8a46c8acf6a612d6ca4b1a5dee6dd469403 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 6 Dec 2017 17:16:32 +0100 Subject: [PATCH 14/18] Add command "clusterd push files..." The command "clusterd push file..." has a new syntax with a command verb. (as opposed to "clusterd --push file") The push command is distributed via the area routers to the cluster daemons on all nodes. It is intended that the cluster daemons call back over tcp to the originator to pull the file if needed. --- clusterd | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/clusterd b/clusterd index fcefac3..1cf5f3f 100755 --- a/clusterd +++ b/clusterd @@ -1667,6 +1667,22 @@ sub cmd_msg { } } +sub cmd_push { + my @files=@_; + for my $filename (@files) { + $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; + -e $filename or die "$filename: no such file\n"; + } + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + for my $filename (@files) { + my $st=Donald::FileInfo->lstat($filename); + defined $st or die "$filename: $!\n"; + $st->type eq 'F' or die "$filename: only plain files currently supported\n"; + udp_broadcast_message($donald_s,'push',$my_ip,$st); + } +} + #------------------------------------------------------------ use constant USAGE => <<'__EOF__'; @@ -1690,6 +1706,8 @@ usage: $0 [options] --foreground # stay in foreground, log to stderr --syslog # log to syslog instead of stderr + push files.... # push files over tcp + __EOF__ use Getopt::Long; @@ -1784,5 +1802,12 @@ if (defined $options{'push'}) { } elsif ($options{'kill'}) { Donald::Tools::kill_previous_server('clusterd'); } else { - die USAGE; + @ARGV or die USAGE; + my ($cmd,@args)=@ARGV; + if ($cmd eq 'push') { + @args>0 or die USAGE; + cmd_push(@args); + } else { + die USAGE; + } } From 81955171dc218579dc68e55b6bbace84092d405d Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 6 Dec 2017 17:19:15 +0100 Subject: [PATCH 15/18] Implement PUSH in daemon When a daemon receices a push command, it checks whether it already has the offered file or not. If not, it calls back to the daemon where the push originated and pulls the file over tcp. --- clusterd | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/clusterd b/clusterd index 1cf5f3f..b9e834e 100755 --- a/clusterd +++ b/clusterd @@ -582,6 +582,7 @@ our %UDP_HANDLER= 'make-automaps' => \&udp_rx_make_automaps, 'log' => \&udp_rx_log, 'exec' => \&udp_rx_exec, + 'push' => \&udp_rx_push, ); sub udp_message { @@ -1667,6 +1668,79 @@ sub cmd_msg { } } +sub udp_rx_push { + my ($ip,$st_want)=@_; + + my $filename=$st_want->name; + my $tmp_filename="$filename.tmp"; + + $ip eq $my_ip and return; + + my $st_is=Donald::FileInfo->lstat($st_want->name); + + unless ($st_want->type eq 'F') { + warn "$filename: type ".$st_want->type." not yet implemented\n"; + return; + } + + if ($st_is + && $st_is->type eq 'F' + && $st_is->size == $st_want->size + && $st_is->mtime == $st_want->mtime + && $st_is->uid == $st_want->uid + && $st_is->gid == $st_want->gid + && $st_is->perm == $st_want->perm + ) { + warn "$filename: already okay\n"; + return; + } + + if ($st_want->size==0) { + my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0); + defined $fh or return warn "$tmp_filename: $!\n"; + $fh->close; + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + rename($tmp_filename,$filename); + utime($st_want->mtime,$st_want->mtime,$filename); + warn "installed (empty) $filename\n"; + return; + } + + my $s; + $s=My::Select::INET::connect_tcp($ip,$CLP_PORT,5,sub { + $! and return warn "$ip: $!\n"; + send_tcp_cp($s,sub { + $! and return warn "$ip: $!\n"; + my $fh = IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0); + defined $fh or return warn "$tmp_filename: $!\n"; + + my $cb; + my $bytes=$st_want->size; + $cb=sub { + # note, we need to break the circular references $cb of our caller, if no longer needed + my ($buf)=@_; + if ($!) { warn "$ip: $!\n";$cb=undef;return; } + if (length($buf)==0) { warn "$ip: EOF\n";$cb=undef;return;} + print $fh $buf; + $bytes-=length($buf); + if ($bytes>0) { + My::Select::INET::read_with_timeout($s,$cb,5); + return; + } + $cb=undef; + close $fh; + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + rename($tmp_filename,$filename) or return warn "$filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$filename); + warn "installed $filename\n"; + }; + My::Select::INET::read_with_timeout($s,$cb,5); + },5,'PULL',$st_want); + }); +} + sub cmd_push { my @files=@_; for my $filename (@files) { @@ -1679,6 +1753,7 @@ sub cmd_push { my $st=Donald::FileInfo->lstat($filename); defined $st or die "$filename: $!\n"; $st->type eq 'F' or die "$filename: only plain files currently supported\n"; + open my $test,'<',$filename or die "$filename: $!\n"; udp_broadcast_message($donald_s,'push',$my_ip,$st); } } From fee25ebf0a361a4ec6395240a1d8e30fee80029a Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 6 Dec 2017 17:20:51 +0100 Subject: [PATCH 16/18] Implement PULL in daemon Implement the server side to pull a file over tcp. --- clusterd | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/clusterd b/clusterd index b9e834e..3b44b15 100755 --- a/clusterd +++ b/clusterd @@ -1298,7 +1298,7 @@ sub run_cmd { our $CLP_PORT=235; our $clp_listen_socket; -our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF); +our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF,'PULL'=>\&clp_rx_PULL); sub clp_init { $clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1); @@ -1668,6 +1668,46 @@ sub cmd_msg { } } +our $SYS_SENDFILE=40; # /usr/include/asm/unistd_64.h + +sub sendfile { + my ($out_fd,$in_fd,$offset,$count)=@_; + my $ret=syscall($SYS_SENDFILE,fileno($out_fd),fileno($in_fd),$offset,$count); + return $ret<0 ? undef : $ret; +} + +sub clp_rx_PULL { + my ($s,$st_want)=@_; + + my $st_is=Donald::FileInfo->lstat($st_want->name); + if (!defined $st_is or $st_is->type ne 'F' or $st_is->size != $st_want->size or $st_is->mtime != $st_want->mtime) { + warn $st_want->name." requested by ".$s->peerhost.": no longer available\n"; + return; + } + my $fh; + unless (open $fh,'<',$st_want->name) { + warn $st_want->name.": $!\n"; + return; + } + my $bytes=$st_is->size; + my $cb_tmo=sub { My::Select::cancel_handle($s); }; + my $cb_write=sub { + my $l=sendfile($s,$fh,0,$bytes); + unless (defined $l) { + warn "$!"; + return; + } + $bytes-=$l; + if ($bytes) { + My::Select::writer_requeue if $bytes; + } else { + close($s); + } + }; + My::Select::timeout(5,$cb_tmo); + My::Select::writer($s,$cb_write); +} + sub udp_rx_push { my ($ip,$st_want)=@_; From 1b1370db7261b6629db0267f0e5aeb87e19811b1 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Thu, 7 Dec 2017 12:58:34 +0100 Subject: [PATCH 17/18] Increase TCP timeout from 5 to 30 While 5 seconds seems to be enough for normal behaviour, we might need more time if the daemon is slowed down .e.g. by strace. --- clusterd | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/clusterd b/clusterd index 3b44b15..60fc54f 100755 --- a/clusterd +++ b/clusterd @@ -531,6 +531,7 @@ use Data::Dumper; our $UDP_MAX=1472; # for broadcast on alphas our $UDP_PORT=234; our $BC_RATE=8; # packets per second broadcast +our $TCP_TIMEOUT=30; # default timeout for tcp processing our (%options); # RUN OPTIONS @@ -1748,7 +1749,7 @@ sub udp_rx_push { } my $s; - $s=My::Select::INET::connect_tcp($ip,$CLP_PORT,5,sub { + $s=My::Select::INET::connect_tcp($ip,$CLP_PORT,$TCP_TIMEOUT,sub { $! and return warn "$ip: $!\n"; send_tcp_cp($s,sub { $! and return warn "$ip: $!\n"; @@ -1765,7 +1766,7 @@ sub udp_rx_push { print $fh $buf; $bytes-=length($buf); if ($bytes>0) { - My::Select::INET::read_with_timeout($s,$cb,5); + My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); return; } $cb=undef; @@ -1776,8 +1777,8 @@ sub udp_rx_push { utime($st_want->mtime,$st_want->mtime,$filename); warn "installed $filename\n"; }; - My::Select::INET::read_with_timeout($s,$cb,5); - },5,'PULL',$st_want); + My::Select::INET::read_with_timeout($s,$cb,$TCP_TIMEOUT); + },$TCP_TIMEOUT,'PULL',$st_want); }); } From d1e725e42ee725c87c53db155d48e9487d76ed39 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Thu, 7 Dec 2017 13:22:34 +0100 Subject: [PATCH 18/18] Increase listen queue size for clp port With 234 hosts pulling a file we failed to service a single on with the old setting listen=1. --- clusterd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clusterd b/clusterd index 60fc54f..97867d9 100755 --- a/clusterd +++ b/clusterd @@ -1302,7 +1302,7 @@ our $clp_listen_socket; our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF,'PULL'=>\&clp_rx_PULL); sub clp_init { - $clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1); + $clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>128,ReuseAddr=>1); defined $clp_listen_socket or die "$!\n"; My::Select::reader($clp_listen_socket,\&clp_connect_request); }