Skip to content

Commit

Permalink
Implement PUSH in daemon
Browse files Browse the repository at this point in the history
When a daemon receices a push command, it checks whether it already has
the offered file or not. If not, it calls back to the daemon where the
push originated and pulls the file over tcp.
  • Loading branch information
donald committed Dec 7, 2017
1 parent 305be8a commit 8195517
Showing 1 changed file with 75 additions and 0 deletions.
75 changes: 75 additions & 0 deletions clusterd
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ our %UDP_HANDLER=
'make-automaps' => \&udp_rx_make_automaps,
'log' => \&udp_rx_log,
'exec' => \&udp_rx_exec,
'push' => \&udp_rx_push,
);

sub udp_message {
Expand Down Expand Up @@ -1667,6 +1668,79 @@ sub cmd_msg {
}
}

sub udp_rx_push {
my ($ip,$st_want)=@_;

my $filename=$st_want->name;
my $tmp_filename="$filename.tmp";

$ip eq $my_ip and return;

my $st_is=Donald::FileInfo->lstat($st_want->name);

unless ($st_want->type eq 'F') {
warn "$filename: type ".$st_want->type." not yet implemented\n";
return;
}

if ($st_is
&& $st_is->type eq 'F'
&& $st_is->size == $st_want->size
&& $st_is->mtime == $st_want->mtime
&& $st_is->uid == $st_want->uid
&& $st_is->gid == $st_want->gid
&& $st_is->perm == $st_want->perm
) {
warn "$filename: already okay\n";
return;
}

if ($st_want->size==0) {
my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0);
defined $fh or return warn "$tmp_filename: $!\n";
$fh->close;
chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n";
chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n";
rename($tmp_filename,$filename);
utime($st_want->mtime,$st_want->mtime,$filename);
warn "installed (empty) $filename\n";
return;
}

my $s;
$s=My::Select::INET::connect_tcp($ip,$CLP_PORT,5,sub {
$! and return warn "$ip: $!\n";
send_tcp_cp($s,sub {
$! and return warn "$ip: $!\n";
my $fh = IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0);
defined $fh or return warn "$tmp_filename: $!\n";

my $cb;
my $bytes=$st_want->size;
$cb=sub {
# note, we need to break the circular references $cb of our caller, if no longer needed
my ($buf)=@_;
if ($!) { warn "$ip: $!\n";$cb=undef;return; }
if (length($buf)==0) { warn "$ip: EOF\n";$cb=undef;return;}
print $fh $buf;
$bytes-=length($buf);
if ($bytes>0) {
My::Select::INET::read_with_timeout($s,$cb,5);
return;
}
$cb=undef;
close $fh;
chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n";
chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n";
rename($tmp_filename,$filename) or return warn "$filename: $!\n";
utime($st_want->mtime,$st_want->mtime,$filename);
warn "installed $filename\n";
};
My::Select::INET::read_with_timeout($s,$cb,5);
},5,'PULL',$st_want);
});
}

sub cmd_push {
my @files=@_;
for my $filename (@files) {
Expand All @@ -1679,6 +1753,7 @@ sub cmd_push {
my $st=Donald::FileInfo->lstat($filename);
defined $st or die "$filename: $!\n";
$st->type eq 'F' or die "$filename: only plain files currently supported\n";
open my $test,'<',$filename or die "$filename: $!\n";
udp_broadcast_message($donald_s,'push',$my_ip,$st);
}
}
Expand Down

0 comments on commit 8195517

Please sign in to comment.