Skip to content

Commit

Permalink
cluster: Add --post option for push
Browse files Browse the repository at this point in the history
Change protocol for `clusterd push`:

- Multiple files can be offered with one command invocation and one
  UDP broadcast
- The files are actually transferred by calling back with tcp to the
  station which offered the files. This allows the receiving node to
  restrict file distribution sources to specific nodes.
- The list of "trusted nodes" is set to `afk` and `wtf`.
- A list of predefined commands can be given in addition to the list of
  offered files. The commands are executed after the files are
  transferred. This is required, because from the point of view of the
  caller of `cluster push`, the process is asynchronous, and so these
  commands can not be given with additional `clusterd exec` or
  `clusterd --exec` commands.

Expected usage is something like this:

    clusterd push /etc/mxpolicy --post mkmotd

After the daemons have updated, callers should be switches to the new
commands and then the old commands should be removed.
  • Loading branch information
donald committed Jan 27, 2025
1 parent 05bcfb0 commit 5eddb03
Showing 1 changed file with 139 additions and 9 deletions.
148 changes: 139 additions & 9 deletions clusterd/clusterd
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ our %UDP_HANDLER = (
'exec' => \&udp_rx_exec,
'exec.2' => \&udp_rx_exec2,
'push' => \&udp_rx_push,
'push.2' => \&udp_rx_push2,
);

sub udp_message {
Expand Down Expand Up @@ -1580,6 +1581,13 @@ sub send_tcp_cp {
my $data=sign($CLUSTER_PW,encode(@args));
My::Select::INET::send_tcp($s,pack('n',length($data)).$data,$timeout,$cb);
}

sub send_tcp_cp_sync {
my ($s, @args) = @_;
my $data = sign($CLUSTER_PW, encode(@args));
$s->printflush(pack('n', length($data)) . $data);
}

#----------------------------------------------------------

sub sync_cluster_pw {
Expand Down Expand Up @@ -1936,21 +1944,142 @@ sub udp_rx_push {
});
}

our %TRUSTED_IP = (
'141.14.28.170' => 1, # afk
'141.14.16.131' => 1, # wtf
);

sub is_trusted_ip {
my ($ip) = @_;
return exists $TRUSTED_IP{$ip} ? 1 : 0;
}

sub udp_rx_push2 {
my ($ip, $st_ary, $post_ary) = @_;

unless (is_trusted_ip($ip)) {
warn "reject to pull files from $ip : not trusted\n";
return;
}
my $pid = fork;
unless (defined $pid) {
warn "$!\n";
return
}
$pid != 0 and return;

if ($ip ne $my_ip) {
FILE:
for my $st_want (@$st_ary) {
my $filename=$st_want->name;
my $tmp_filename="$filename.tmp";
unless ($st_want->type eq 'F') {
warn "$filename: type ".$st_want->type." not yet implemented\n";
next;
}
my $st_is = Donald::FileInfo->lstat($st_want->name);
if ($st_is
&& $st_is->type eq 'F'
&& $st_is->size == $st_want->size
&& $st_is->mtime == $st_want->mtime
&& $st_is->uid == $st_want->uid
&& $st_is->gid == $st_want->gid
&& $st_is->perm == $st_want->perm
) {
warn "$filename: already okay\n";
next;
}
if ($st_want->size == 0) {
-e $tmp_filename and unlink($tmp_filename);
my $fh = IO::File->new($tmp_filename, O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW, 0);
unless (defined $fh) {
warn "$tmp_filename: $!\n";
next;
}
# no need to fsync empty file
chown $st_want->uid, $st_want->gid, $tmp_filename or return warn "$tmp_filename: $!\n";
chmod $st_want->perm, $tmp_filename or return warn "$tmp_filename: $!\n";
utime($st_want->mtime, $st_want->mtime, $tmp_filename);
rename($tmp_filename, $filename) or return warn "rename $tmp_filename $filename: $!\n";;
warn "installed (empty) $filename\n";
next;
}
my $s = new IO::Socket::INET(PeerAddr => $ip, PeerPort => $CLP_PORT);
unless ($s) {
warn "$ip: $!\n";
next;
}
send_tcp_cp_sync($s, 'PULL', $st_want);
-e $tmp_filename and unlink($tmp_filename);
my $fh = IO::File->new($tmp_filename,O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW,0);
unless (defined $fh) {
warn "$tmp_filename: $!\n";
next;
}
my $bytes = $st_want->size;
while (1) {
my $buf = "";
my $len = read($s, $buf, 1024);
if ($len < 0) {
warn "$ip $filename: $!\n";
next FILE;
}
if ($len == 0) {
warn "$ip $filename: file received to short\n";
next FILE;
}
if ($len > $bytes) {
warn "$ip $filename: file received to long\n";
next FILE;
}
print $fh $buf;
$bytes -= $len;
if ($bytes == 0) {
$fh->flush();
$fh->sync();
chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n";
chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n";
utime($st_want->mtime,$st_want->mtime,$tmp_filename);
rename($tmp_filename,$filename) or return warn "rename $tmp_filename $filename: $!\n";
warn "installed $filename\n";
next FILE;
}
}
}
}
open STDIN, '<', '/dev/null';
chdir '/';
alarm(60);
for my $cmd (@$post_ary) {
exists $CMD{$cmd} and warn "executing $CMD{$cmd}\n";
exists $CMD{$cmd} and system '/bin/sh', '-c', $CMD{$cmd};
}
exit;
}

sub cmd_push {
my @files=@_;
my ($post, @files) = @_;

is_trusted_ip($my_ip) or die "This command only works on a trusted host\n";

for my $cmd (@$post) {
exists $CMD{$cmd} or die "$cmd: only these commands are allowed: " . join(', ', keys %CMD) . "\n";
}
for my $filename (@files) {
$filename =~ m"^/" or return warn "$filename: please use absolute path\n";
-e $filename or die "$filename: no such file\n";
}
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n";
my @st = ();
for my $filename (@files) {
my $st=Donald::FileInfo->lstat($filename);
my $st = Donald::FileInfo->lstat($filename);
defined $st or die "$filename: $!\n";
$st->type eq 'F' or die "$filename: only plain files currently supported\n";
open my $test,'<',$filename or die "$filename: $!\n";
udp_broadcast_message($donald_s,'push',$my_ip,$st);
open my $test,'<', $filename or die "$filename: $!\n";
push @st, $st;
}
sync_cluster_pw() or die "$CLUSTER_PW_FILE: $!\n";
$donald_s=new My::Select::INET(Proto=>'udp') or die "$!\n";
udp_broadcast_message($donald_s, 'push.2', $my_ip, \@st, $post);
}

sub cmd_exec {
Expand Down Expand Up @@ -2007,8 +2136,8 @@ usage: $0 [options]
--daemon # start a daemon
push files.... # push files over tcp
exec CMD... # execute CMD on all nodes
push [--post CMD] files... # push files over tcp
exec CMD... # execute CMD on all nodes
CMD : mkmotd | flush-gidcache | reexport | make-automaps
__EOF__
Expand All @@ -2024,6 +2153,7 @@ GetOptions (
'make-automaps' => \$options{'make-automaps'},
'reexport' => \$options{'reexport'},
'lsof=s' => \$options{'lsof'},
'post=s@' => \$options{'post'},

) or die USAGE;

Expand Down Expand Up @@ -2096,7 +2226,7 @@ if (defined $options{'push'}) {
my ($cmd,@args)=@ARGV;
if ($cmd eq 'push') {
@args>0 or die USAGE;
cmd_push(@args);
cmd_push($options{'post'} || [], @args);
} elsif ($cmd eq 'exec') {
@args > 0 or die USAGE;
cmd_exec(@args);
Expand Down

0 comments on commit 5eddb03

Please sign in to comment.