From fee25ebf0a361a4ec6395240a1d8e30fee80029a Mon Sep 17 00:00:00 2001
From: Donald Buczek <buczek@molgen.mpg.de>
Date: Wed, 6 Dec 2017 17:20:51 +0100
Subject: [PATCH] Implement PULL in daemon

Implement the server side to pull a file over tcp.
---
 clusterd | 42 +++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 41 insertions(+), 1 deletion(-)

diff --git a/clusterd b/clusterd
index b9e834e..3b44b15 100755
--- a/clusterd
+++ b/clusterd
@@ -1298,7 +1298,7 @@ sub run_cmd {
 our $CLP_PORT=235;
 our $clp_listen_socket;
 
-our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF);
+our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF,'PULL'=>\&clp_rx_PULL);
 
 sub clp_init {
 	$clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1);
@@ -1668,6 +1668,46 @@ sub cmd_msg {
 	}
 }
 
+our $SYS_SENDFILE=40;	# /usr/include/asm/unistd_64.h
+
+sub sendfile {
+	my ($out_fd,$in_fd,$offset,$count)=@_;
+	my $ret=syscall($SYS_SENDFILE,fileno($out_fd),fileno($in_fd),$offset,$count);
+	return $ret<0 ? undef : $ret;
+}
+
+sub clp_rx_PULL {
+	my ($s,$st_want)=@_;
+
+	my $st_is=Donald::FileInfo->lstat($st_want->name);
+	if (!defined $st_is or $st_is->type ne 'F' or $st_is->size != $st_want->size or $st_is->mtime != $st_want->mtime) {
+		warn $st_want->name." requested by ".$s->peerhost.": no longer available\n";
+		return;
+	}
+	my $fh;
+	unless (open $fh,'<',$st_want->name) {
+		warn $st_want->name.": $!\n";
+		return;
+	}
+	my $bytes=$st_is->size;
+	my $cb_tmo=sub { My::Select::cancel_handle($s); };
+	my $cb_write=sub {
+		my $l=sendfile($s,$fh,0,$bytes);
+		unless (defined $l) {
+			warn "$!";
+			return;
+		}
+		$bytes-=$l;
+		if ($bytes) {
+			My::Select::writer_requeue if $bytes;
+		} else {
+			close($s);
+		}
+	};
+	My::Select::timeout(5,$cb_tmo);
+	My::Select::writer($s,$cb_write);
+}
+
 sub udp_rx_push {
 	my ($ip,$st_want)=@_;