Skip to content

Commit

Permalink
Add functions for nonblocking tcp to My::Select::INET
Browse files Browse the repository at this point in the history
  • Loading branch information
donald committed Dec 7, 2017
1 parent e61ecda commit 01e9dc4
Showing 1 changed file with 101 additions and 0 deletions.
101 changes: 101 additions & 0 deletions clusterd
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ use IO::Socket::INET;

our $UDP_MAX=1472; # for broadcast on alphas

our $SOL_SOCKET=1;
our $SO_ERROR=4;

sub get_socket_error
{
my ($s)=@_;
return unpack('i',getsockopt($s,$SOL_SOCKET,$SO_ERROR));
}

sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT )
{
my ($class,@args)=@_;
Expand Down Expand Up @@ -187,6 +196,98 @@ sub receive_data
My::Select::reader($$self,$receive_data_cb);
}

# send_tcp($socket,$data,$timeout,$cb);
#
# send data asynchronously over noblocking tcp socket
# call callback when done ($!=0) or on error ($! set)
#
# all arguments required
#
sub send_tcp {
my ($s,$data,$timeout,$cb)=@_;
my $len=$s->send($data,0);
defined $len or return $cb->();
if ($len==length($data)) {
$!=0;
$cb->();
return;
}
my $pos=$len;
my $cb_tmo=sub {
My::Select::cancel_handle($s);
$!=110;
$cb->();
};
my $cb_write=sub {
My::Select::timeout_cancel($cb_tmo);
my $len=send($s,substr($data,$pos),0);
defined $len or return $cb->();
if ($len==length($data)-$pos) {
$!=0;
$cb->();
return;
}
$pos+=$len;
My::Select::timeout($timeout,$cb_tmo);
My::Select::writer_requeue();
};
My::Select::timeout($timeout,$cb_tmo);
My::Select::writer($s,$cb_write);
}

# $socket = connect_tcp ($ip,$port,$timeout,$cb)
#
# asynchronously connect to tcp socket.
# call callback when done or on error (with $! set)
#
# all arguments required
#
sub connect_tcp {
my ($ip,$port,$timeout,$cb)=@_;

my $s=new IO::Socket::INET (PeerAddr=>$ip,PeerPort=>$port,Blocking=>0);
defined $s or return $cb->();
my $cb_tmo=sub {
My::Select::cancel_handle($s);
$!=110;
$cb->();
};
my $cb_write=sub {
My::Select::timeout_cancel($cb_tmo);
$!=get_socket_error($s);
$cb->();
};
My::Select::timeout($timeout,$cb_tmo);
My::Select::writer($s,$cb_write);
return $s;
}

# read_with_timeout($socket,$callback,$timeout)
#
# asynchronously read from tcp socket.
#
sub read_with_timeout {
my ($s,$cb,$timeout)=@_;
my $cb_tmo=sub {
My::Select::cancel_handle($s);
$!=110;
$cb->(undef);
};
my $cb_read=sub {
My::Select::timeout_cancel($cb_tmo);
my $buf='';
my $l=sysread($s,$buf,102400,0);
if (!defined $l) {
$cb->(undef);
} else {
$!=0;
$cb->($buf);
}
};
My::Select::timeout($timeout,$cb_tmo);
My::Select::reader($s,$cb_read)
}

#--------------------------------------
package My::Cluster::Updown;

Expand Down

0 comments on commit 01e9dc4

Please sign in to comment.