From aa75eec81b145782c790b66f685c573c64ad8f33 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Mon, 27 Jan 2025 18:54:20 +0100 Subject: [PATCH] cluster: Add --post option for push Change protocol for `clusterd push`: - Multiple files can be offered with one command invocation and one UDP broadcast - The files are actually transferred by calling back with tcp to the station which offered the files. This allows the receiving node to restrict file distribution sources to specific nodes. - The list of "trusted nodes" is set to `afk` and `wtf`. - A list of predefined commands can be given in addition to the list of offered files. The commands are executed after the files are transferred. This is required, because from the point of view of the caller of `cluster push`, the process is asynchronous, and so these commands can not be given with additional `clusterd exec` or `clusterd --exec` commands. Expected usage is something like this: clusterd push /etc/mxpolicy --post mkmotd After the daemons have updated, callers should be switches to the new commands and then the old commands should be removed. --- clusterd/clusterd | 148 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 139 insertions(+), 9 deletions(-) diff --git a/clusterd/clusterd b/clusterd/clusterd index c417ad2..17e13fa 100755 --- a/clusterd/clusterd +++ b/clusterd/clusterd @@ -771,6 +771,7 @@ our %UDP_HANDLER = ( 'exec' => \&udp_rx_exec, 'exec.2' => \&udp_rx_exec2, 'push' => \&udp_rx_push, + 'push.2' => \&udp_rx_push2, ); sub udp_message { @@ -1580,6 +1581,13 @@ sub send_tcp_cp { my $data=sign($CLUSTER_PW,encode(@args)); My::Select::INET::send_tcp($s,pack('n',length($data)).$data,$timeout,$cb); } + +sub send_tcp_cp_sync { + my ($s, @args) = @_; + my $data = sign($CLUSTER_PW, encode(@args)); + $s->printflush(pack('n', length($data)) . $data); +} + #---------------------------------------------------------- sub sync_cluster_pw { @@ -1936,21 +1944,142 @@ sub udp_rx_push { }); } +our %TRUSTED_IP = ( + '141.14.28.170' => 1, # afk + '141.14.16.131' => 1, # wtf +); + +sub is_trusted_ip { + my ($ip) = @_; + return exists $TRUSTED_IP{$ip} ? 1 : 0; +} + +sub udp_rx_push2 { + my ($ip, $st_ary, $post_ary) = @_; + + unless (is_trusted_ip($ip)) { + warn "reject to pull files from $ip : not trusted\n"; + return; + } + my $pid = fork; + unless (defined $pid) { + warn "$!\n"; + return + } + $pid != 0 and return; + + if ($ip ne $my_ip) { +FILE: + for my $st_want (@$st_ary) { + my $filename=$st_want->name; + my $tmp_filename="$filename.tmp"; + unless ($st_want->type eq 'F') { + warn "$filename: type ".$st_want->type." not yet implemented\n"; + next; + } + my $st_is = Donald::FileInfo->lstat($st_want->name); + if ($st_is + && $st_is->type eq 'F' + && $st_is->size == $st_want->size + && $st_is->mtime == $st_want->mtime + && $st_is->uid == $st_want->uid + && $st_is->gid == $st_want->gid + && $st_is->perm == $st_want->perm + ) { + warn "$filename: already okay\n"; + next; + } + if ($st_want->size == 0) { + -e $tmp_filename and unlink($tmp_filename); + my $fh = IO::File->new($tmp_filename, O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW, 0); + unless (defined $fh) { + warn "$tmp_filename: $!\n"; + next; + } + # no need to fsync empty file + chown $st_want->uid, $st_want->gid, $tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm, $tmp_filename or return warn "$tmp_filename: $!\n"; + utime($st_want->mtime, $st_want->mtime, $tmp_filename); + rename($tmp_filename, $filename) or return warn "rename $tmp_filename $filename: $!\n";; + warn "installed (empty) $filename\n"; + next; + } + my $s = new IO::Socket::INET(PeerAddr => $ip, PeerPort => $CLP_PORT); + unless ($s) { + warn "$ip: $!\n"; + next; + } + send_tcp_cp_sync($s, 'PULL', $st_want); + -e $tmp_filename and unlink($tmp_filename); + my $fh = IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0); + unless (defined $fh) { + warn "$tmp_filename: $!\n"; + next; + } + my $bytes = $st_want->size; + while (1) { + my $buf = ""; + my $len = read($s, $buf, 1024); + if ($len < 0) { + warn "$ip $filename: $!\n"; + next FILE; + } + if ($len == 0) { + warn "$ip $filename: file received to short\n"; + next FILE; + } + if ($len > $bytes) { + warn "$ip $filename: file received to long\n"; + next FILE; + } + print $fh $buf; + $bytes -= $len; + if ($bytes == 0) { + $fh->flush(); + $fh->sync(); + chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n"; + chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n"; + utime($st_want->mtime,$st_want->mtime,$tmp_filename); + rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n"; + warn "installed $filename\n"; + next FILE; + } + } + } + } + open STDIN, '<', '/dev/null'; + chdir '/'; + alarm(60); + for my $cmd (@$post_ary) { + exists $CMD{$cmd} and warn "executing $CMD{$cmd}\n"; + exists $CMD{$cmd} and system '/bin/sh', '-c', $CMD{$cmd}; + } + exit; +} + sub cmd_push { - my @files=@_; + my ($post, @files) = @_; + + is_trusted_ip($my_ip) or die "This command only works on a trusted host\n"; + + for my $cmd (@$post) { + exists $CMD{$cmd} or die "$cmd: only these commands are allowed: " . join(', ', keys %CMD) . "\n"; + } for my $filename (@files) { $filename =~ m"^/" or return warn "$filename: please use absolute path\n"; -e $filename or die "$filename: no such file\n"; } - sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; - $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + my @st = (); for my $filename (@files) { - my $st=Donald::FileInfo->lstat($filename); + my $st = Donald::FileInfo->lstat($filename); defined $st or die "$filename: $!\n"; $st->type eq 'F' or die "$filename: only plain files currently supported\n"; - open my $test,'<',$filename or die "$filename: $!\n"; - udp_broadcast_message($donald_s,'push',$my_ip,$st); + open my $test,'<', $filename or die "$filename: $!\n"; + push @st, $st; } + sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n"; + $donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n"; + udp_broadcast_message($donald_s, 'push.2', $my_ip, \@st, $post); } sub cmd_exec { @@ -2007,8 +2136,8 @@ usage: $0 [options] --daemon # start a daemon - push files.... # push files over tcp - exec CMD... # execute CMD on all nodes + push [--post CMD] files... # push files over tcp + exec CMD... # execute CMD on all nodes CMD : mkmotd | flush-gidcache | reexport | make-automaps __EOF__ @@ -2024,6 +2153,7 @@ GetOptions ( 'make-automaps' => \$options{'make-automaps'}, 'reexport' => \$options{'reexport'}, 'lsof=s' => \$options{'lsof'}, + 'post=s@' => \$options{'post'}, ) or die USAGE; @@ -2096,7 +2226,7 @@ if (defined $options{'push'}) { my ($cmd,@args)=@ARGV; if ($cmd eq 'push') { @args>0 or die USAGE; - cmd_push(@args); + cmd_push($options{'post'} || [], @args); } elsif ($cmd eq 'exec') { @args > 0 or die USAGE; cmd_exec(@args);