From 01e9dc44124e5502af271cfab4678b6697a5f1c0 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Wed, 6 Dec 2017 17:11:15 +0100 Subject: [PATCH] Add functions for nonblocking tcp to My::Select::INET --- clusterd | 101 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/clusterd b/clusterd index 06989b6..8ee7aea 100755 --- a/clusterd +++ b/clusterd @@ -148,6 +148,15 @@ use IO::Socket::INET; our $UDP_MAX=1472; # for broadcast on alphas +our $SOL_SOCKET=1; +our $SO_ERROR=4; + +sub get_socket_error +{ + my ($s)=@_; + return unpack('i',getsockopt($s,$SOL_SOCKET,$SO_ERROR)); +} + sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) { my ($class,@args)=@_; @@ -187,6 +196,98 @@ sub receive_data My::Select::reader($$self,$receive_data_cb); } +# send_tcp($socket,$data,$timeout,$cb); +# +# send data asynchronously over noblocking tcp socket +# call callback when done ($!=0) or on error ($! set) +# +# all arguments required +# +sub send_tcp { + my ($s,$data,$timeout,$cb)=@_; + my $len=$s->send($data,0); + defined $len or return $cb->(); + if ($len==length($data)) { + $!=0; + $cb->(); + return; + } + my $pos=$len; + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(); + }; + my $cb_write=sub { + My::Select::timeout_cancel($cb_tmo); + my $len=send($s,substr($data,$pos),0); + defined $len or return $cb->(); + if ($len==length($data)-$pos) { + $!=0; + $cb->(); + return; + } + $pos+=$len; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer_requeue(); + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer($s,$cb_write); +} + +# $socket = connect_tcp ($ip,$port,$timeout,$cb) +# +# asynchronously connect to tcp socket. +# call callback when done or on error (with $! set) +# +# all arguments required +# +sub connect_tcp { + my ($ip,$port,$timeout,$cb)=@_; + + my $s=new IO::Socket::INET (PeerAddr=>$ip,PeerPort=>$port,Blocking=>0); + defined $s or return $cb->(); + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(); + }; + my $cb_write=sub { + My::Select::timeout_cancel($cb_tmo); + $!=get_socket_error($s); + $cb->(); + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::writer($s,$cb_write); + return $s; +} + +# read_with_timeout($socket,$callback,$timeout) +# +# asynchronously read from tcp socket. +# +sub read_with_timeout { + my ($s,$cb,$timeout)=@_; + my $cb_tmo=sub { + My::Select::cancel_handle($s); + $!=110; + $cb->(undef); + }; + my $cb_read=sub { + My::Select::timeout_cancel($cb_tmo); + my $buf=''; + my $l=sysread($s,$buf,102400,0); + if (!defined $l) { + $cb->(undef); + } else { + $!=0; + $cb->($buf); + } + }; + My::Select::timeout($timeout,$cb_tmo); + My::Select::reader($s,$cb_read) +} + #-------------------------------------- package My::Cluster::Updown;