diff --git a/clusterd b/clusterd index b9e834e..3b44b15 100755 --- a/clusterd +++ b/clusterd @@ -1298,7 +1298,7 @@ sub run_cmd { our $CLP_PORT=235; our $clp_listen_socket; -our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF); +our %CLP_HANDLER=('CMD'=>\&clp_rx_CMD,'LSOF'=>\&clp_rx_LSOF,'PULL'=>\&clp_rx_PULL); sub clp_init { $clp_listen_socket=new IO::Socket::INET(LocalPort=>$CLP_PORT,Proto=>'tcp',Listen=>1,ReuseAddr=>1); @@ -1668,6 +1668,46 @@ sub cmd_msg { } } +our $SYS_SENDFILE=40; # /usr/include/asm/unistd_64.h + +sub sendfile { + my ($out_fd,$in_fd,$offset,$count)=@_; + my $ret=syscall($SYS_SENDFILE,fileno($out_fd),fileno($in_fd),$offset,$count); + return $ret<0 ? undef : $ret; +} + +sub clp_rx_PULL { + my ($s,$st_want)=@_; + + my $st_is=Donald::FileInfo->lstat($st_want->name); + if (!defined $st_is or $st_is->type ne 'F' or $st_is->size != $st_want->size or $st_is->mtime != $st_want->mtime) { + warn $st_want->name." requested by ".$s->peerhost.": no longer available\n"; + return; + } + my $fh; + unless (open $fh,'<',$st_want->name) { + warn $st_want->name.": $!\n"; + return; + } + my $bytes=$st_is->size; + my $cb_tmo=sub { My::Select::cancel_handle($s); }; + my $cb_write=sub { + my $l=sendfile($s,$fh,0,$bytes); + unless (defined $l) { + warn "$!"; + return; + } + $bytes-=$l; + if ($bytes) { + My::Select::writer_requeue if $bytes; + } else { + close($s); + } + }; + My::Select::timeout(5,$cb_tmo); + My::Select::writer($s,$cb_write); +} + sub udp_rx_push { my ($ip,$st_want)=@_;