From 81955171dc218579dc68e55b6bbace84092d405d Mon Sep 17 00:00:00 2001
From: Donald Buczek <buczek@molgen.mpg.de>
Date: Wed, 6 Dec 2017 17:19:15 +0100
Subject: [PATCH] Implement PUSH in daemon

When a daemon receices a push command, it checks whether it already has
the offered file or not. If not, it calls back to the daemon where the
push originated and pulls the file over tcp.
---
 clusterd | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 75 insertions(+)

diff --git a/clusterd b/clusterd
index 1cf5f3f..b9e834e 100755
--- a/clusterd
+++ b/clusterd
@@ -582,6 +582,7 @@ our %UDP_HANDLER=
 	'make-automaps' => \&udp_rx_make_automaps,
 	'log'  => \&udp_rx_log,
 	'exec' => \&udp_rx_exec,
+	'push' => \&udp_rx_push,
 );
 
 sub udp_message {
@@ -1667,6 +1668,79 @@ sub cmd_msg {
 	}
 }
 
+sub udp_rx_push {
+	my ($ip,$st_want)=@_;
+
+	my $filename=$st_want->name;
+	my $tmp_filename="$filename.tmp";
+
+	$ip eq $my_ip and return;
+
+	my $st_is=Donald::FileInfo->lstat($st_want->name);
+
+	unless ($st_want->type eq 'F') {
+		warn "$filename: type ".$st_want->type." not yet implemented\n";
+		return;
+	}
+
+	if ($st_is
+		&& $st_is->type eq 'F'
+		&& $st_is->size == $st_want->size
+		&& $st_is->mtime == $st_want->mtime
+		&& $st_is->uid == $st_want->uid
+		&& $st_is->gid == $st_want->gid
+		&& $st_is->perm == $st_want->perm
+	) {
+		warn "$filename: already okay\n";
+		return;
+	}
+
+	if ($st_want->size==0) {
+		my $fh=IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0);
+		defined $fh or return warn "$tmp_filename: $!\n";
+		$fh->close;
+		chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n";
+		chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n";
+		rename($tmp_filename,$filename);
+		utime($st_want->mtime,$st_want->mtime,$filename);
+		warn "installed (empty) $filename\n";
+		return;
+	}
+
+	my $s;
+	$s=My::Select::INET::connect_tcp($ip,$CLP_PORT,5,sub {
+		$! and return warn "$ip: $!\n";
+		send_tcp_cp($s,sub {
+			$! and return warn "$ip: $!\n";
+			my $fh = IO::File->new($tmp_filename,O_WRONLY|O_CREAT,0);
+			defined $fh or return warn "$tmp_filename: $!\n";
+
+			my $cb;
+			my $bytes=$st_want->size;
+			$cb=sub {
+				# note, we need to break the circular references $cb of our caller, if no longer needed
+				my ($buf)=@_;
+				if ($!) { warn "$ip: $!\n";$cb=undef;return; }
+				if (length($buf)==0) { warn "$ip: EOF\n";$cb=undef;return;}
+				print $fh $buf;
+				$bytes-=length($buf);
+				if ($bytes>0) {
+					My::Select::INET::read_with_timeout($s,$cb,5);
+					return;
+				}
+				$cb=undef;
+				close $fh;
+				chown $st_want->uid,$st_want->gid,$tmp_filename or return warn "$tmp_filename: $!\n";
+				chmod $st_want->perm,$tmp_filename or return warn "$tmp_filename: $!\n";
+				rename($tmp_filename,$filename) or return warn "$filename: $!\n";
+				utime($st_want->mtime,$st_want->mtime,$filename);
+				warn "installed $filename\n";
+			};
+			My::Select::INET::read_with_timeout($s,$cb,5);
+		},5,'PULL',$st_want);
+	});
+}
+
 sub cmd_push {
 	my @files=@_;
 	for my $filename (@files) {
@@ -1679,6 +1753,7 @@ sub cmd_push {
 		my $st=Donald::FileInfo->lstat($filename);
 		defined $st or die "$filename: $!\n";
 		$st->type eq 'F' or die "$filename: only plain files currently supported\n";
+		open my $test,'<',$filename or die "$filename: $!\n";
 		udp_broadcast_message($donald_s,'push',$my_ip,$st);
 	}
 }