diff --git a/clusterd/clusterd b/clusterd/clusterd index ff50cfd..cab912a 100755 --- a/clusterd/clusterd +++ b/clusterd/clusterd @@ -255,14 +255,6 @@ use IO::Socket::INET; our $UDP_MAX = 1472; -our $SOL_SOCKET=1; -our $SO_ERROR=4; - -sub get_socket_error { - my ($s)=@_; - return unpack('i',getsockopt($s,$SOL_SOCKET,$SO_ERROR)); -} - sub new { # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) my ($class,@args)=@_; our $socket=new IO::Socket::INET (@args) or return undef; @@ -296,98 +288,6 @@ sub receive_data { My::Select::reader($$self,$receive_data_cb); } -# send_tcp($socket,$data,$timeout,$cb); -# -# send data asynchronously over noblocking tcp socket -# call callback when done ($!=0) or on error ($! set) -# -# all arguments required -# -sub send_tcp { - my ($s,$data,$timeout,$cb)=@_; - my $len=$s->send($data,0); - defined $len or return $cb->(); - if ($len==length($data)) { - $!=0; - $cb->(); - return; - } - my $pos=$len; - my $cb_tmo=sub { - My::Select::cancel_handle($s); - $!=110; - $cb->(); - }; - my $cb_write=sub { - My::Select::timeout_cancel($cb_tmo); - my $len=send($s,substr($data,$pos),0); - defined $len or return $cb->(); - if ($len==length($data)-$pos) { - $!=0; - $cb->(); - return; - } - $pos+=$len; - My::Select::timeout($timeout,$cb_tmo); - My::Select::writer_requeue(); - }; - My::Select::timeout($timeout,$cb_tmo); - My::Select::writer($s,$cb_write); -} - -# $socket = connect_tcp ($ip,$port,$timeout,$cb) -# -# asynchronously connect to tcp socket. -# call callback when done or on error (with $! set) -# -# all arguments required -# -sub connect_tcp { - my ($ip,$port,$timeout,$cb)=@_; - - my $s=new IO::Socket::INET (PeerAddr=>$ip,PeerPort=>$port,Blocking=>0); - defined $s or return $cb->(); - my $cb_tmo=sub { - My::Select::cancel_handle($s); - $!=110; - $cb->(); - }; - my $cb_write=sub { - My::Select::timeout_cancel($cb_tmo); - $!=get_socket_error($s); - $cb->(); - }; - My::Select::timeout($timeout,$cb_tmo); - My::Select::writer($s,$cb_write); - return $s; -} - -# read_with_timeout($socket,$callback,$timeout) -# -# asynchronously read from tcp socket. -# -sub read_with_timeout { - my ($s,$cb,$timeout)=@_; - my $cb_tmo=sub { - My::Select::cancel_handle($s); - $!=110; - $cb->(undef); - }; - my $cb_read=sub { - My::Select::timeout_cancel($cb_tmo); - my $buf=''; - my $l=sysread($s,$buf,102400,0); - if (!defined $l) { - $cb->(undef); - } else { - $!=0; - $cb->($buf); - } - }; - My::Select::timeout($timeout,$cb_tmo); - My::Select::reader($s,$cb_read) -} - #-------------------------------------- package My::Cluster::Updown; @@ -629,10 +529,7 @@ use IO::Socket::INET; use Data::Dumper; use Digest::MD5; -our $UDP_MAX=1472; our $UDP_PORT=234; -our $BC_RATE = 8; # packets per second broadcast -our $TCP_TIMEOUT=30; # default timeout for tcp processing our (%options); # RUN OPTIONS @@ -1190,18 +1087,6 @@ sub clp_rx_CMD { close $socket; } -# send_tcp_cp($socket,$cb,$timeout,@args) -# -# send a cluster protocoll message over an async tcp socket. -# -# assume $CLUSTER_PW is valid -# -sub send_tcp_cp { - my ($s,$cb,$timeout,@args)=@_; - my $data=sign($CLUSTER_PW,encode(@args)); - My::Select::INET::send_tcp($s,pack('n',length($data)).$data,$timeout,$cb); -} - sub send_tcp_cp_sync { my ($s, @args) = @_; my $data = sign($CLUSTER_PW, encode(@args));