diff --git a/clusterd b/clusterd index fd857e1..5d15339 100755 --- a/clusterd +++ b/clusterd @@ -14,10 +14,9 @@ use Donald::FileInfo; use POSIX; use IO::Pipe; -#-------------------------------------- +#------------------------------------- package My::Select; -use Donald::Callback 1.01 ; use Donald::Tools; our $time=Donald::Tools::uptime(); @@ -30,11 +29,9 @@ sub My::Select::time our @TIMER=(); # ( [duetime,cb] , ... ) sorted by time our $active_timer_cb; -sub timeout # cb=Select::timeout(seconds,cb) or cb=Select::Timeout(seconds,subref [,args,...]) +sub timeout # cb=Select::timeout(seconds,cb) { - my ($delta,$cb_or_sub,@args)=@_; - my $cb=new Donald::Callback($cb_or_sub,@args); - + my ($delta,$cb)=@_; my $duetime=$time+$delta; @TIMER=sort {$a->[0] <=> $b->[0]} ( [$duetime,$cb] , @TIMER ); return $cb; @@ -43,7 +40,7 @@ sub timeout # cb=Select::timeout(seconds,cb) or cb=Select::Timeout(seconds,sub sub timeout_cancel # cb=Select::timeout_cancel(cb) { my ($cb)=@_; - @TIMER=grep {$_->[1] != $cb && $_->[1]->[0] != $cb} @TIMER; + @TIMER=grep {$_->[1] != $cb} @TIMER; return $cb; } @@ -58,24 +55,21 @@ our $active_io; # [Handle,cb] sub reader # cb = Select::reader(Handle,cb) { - my ($handle,$cb_or_sub,@args)=@_; - my $cb=new Donald::Callback($cb_or_sub,@args); + my ($handle,$cb)=@_; push @READER,[$handle,$cb]; return $cb; } sub writer # cb = Select::writer(Handle,cb) { - my ($handle,$cb_or_sub,@args)=@_; - my $cb=new Donald::Callback($cb_or_sub,@args); + my ($handle,$cb)=@_; push @WRITER,[$handle,$cb]; return $cb; } sub except # cb = Select::except(Handle,cb) { - my ($handle,$cb_or_sub,@args)=@_; - my $cb=new Donald::Callback($cb_or_sub,@args); + my ($handle,$cb)=@_; push @EXCEPT,[$handle,$cb]; return $cb; } @@ -107,7 +101,7 @@ sub run $time=Donald::Tools::uptime(); while (@TIMER && $TIMER[0]->[0]<=$time) { $active_timer_cb=(shift @TIMER)->[1]; - $active_timer_cb->call(); + $active_timer_cb->(); } $active_timer_cb=undef; @@ -122,7 +116,7 @@ sub run for (my $i=0;$i<@READER;$i++) { if (vec($rvec,$READER[$i]->[0]->fileno,1)) { $active_io=splice @READER,$i,1; - $active_io->[1]->call(); + $active_io->[1]->(); $active_io=undef; last; } @@ -130,7 +124,7 @@ sub run for (my $i=0;$i<@WRITER;$i++) { if (vec($wvec,$WRITER[$i]->[0]->fileno,1)) { $active_io=splice @WRITER,$i,1; - $active_io->[1]->call(); + $active_io->[1]->(); $active_io=undef; last; } @@ -138,7 +132,7 @@ sub run for (my $i=0;$i<@EXCEPT;$i++) { if (vec($evec,$EXCEPT[$i]->[0]->fileno,1)) { $active_io=splice @EXCEPT,$i,1; - $active_io->[1]->call(); + $active_io->[1]->(); $active_io=undef; last; } @@ -157,7 +151,6 @@ use Storable; our $UDP_MAX=1472; # for broadcast on alphas - sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) { my ($class,@args)=@_; @@ -178,25 +171,23 @@ sub send_data sub reader { - my ($self,$sub,@args)=@_; - My::Select::reader($$self,$sub,@args); + my ($self,$sub)=@_; + My::Select::reader($$self,$sub); } sub receive_data { - my ($self,$sub,@args)=@_; - My::Select::reader($$self,\&receive_data_cb,$self,$sub,@args); -} + my ($self,$cb,@args)=@_; -sub receive_data_cb -{ - my ($self,$sub,@args)=@_; - my $data; - my $peer = $$self->recv($data,$UDP_MAX); - my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); - my $udp_peer_addr=inet_ntoa($peer_iaddr); - My::Select::reader_requeue(); - $sub->($data,$udp_peer_addr,$udp_peer_port,@args); + my $receive_data_cb=sub { + my $data; + my $peer = $$self->recv($data,$UDP_MAX); + my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); + my $udp_peer_addr=inet_ntoa($peer_iaddr); + My::Select::reader_requeue(); + $cb->($data,$udp_peer_addr,$udp_peer_port); + }; + My::Select::reader($$self,$receive_data_cb); } #-------------------------------------- @@ -421,7 +412,7 @@ sub connect_request { my $peernode=$socket->peerhost; my $buffer=''; - My::Select::reader($socket,\&receive,$socket,$peernode,\$buffer); + My::Select::reader($socket,sub{receive($socket,$peernode,\$buffer)}); # warn "$peernode: connect\n"; } @@ -1015,7 +1006,7 @@ sub mgmt_connect_request { ### warn "accepted mgmt connection from $peernode\n"; - My::Select::reader($socket,\&mgmt_receive,$socket); + My::Select::reader($socket,sub{mgmt_receive($socket)}); $mgmt_sockets{$socket}=$socket; $socket->print("clusterd ".version_info()." stupid console\n"); $socket->print("For historical messages, grep \"clusterd\" from /var/log/messages on $STAT_TARGET (or \"tail -f /var/log/messages |grep cluster\")\n"); @@ -1230,7 +1221,7 @@ sub clp_connect_request { my $buffer=''; - My::Select::reader($socket,\&clp_receive,$socket,\$buffer); + My::Select::reader($socket,sub{clp_receive($socket,\$buffer)}); $clp_sockets{$socket}=$socket; } @@ -1346,7 +1337,7 @@ sub init_area { warn "I am area router for $AREA_ROUTER{$my_hostname}\n"; $area_socket=new IO::Socket::INET (Proto=>'udp',LocalPort=>$UDP_PORT+1) or die "$!\n"; - My::Select::reader($area_socket,\&area_message,$area_socket); + My::Select::reader($area_socket,sub{area_message($area_socket)}); } sub area_message { @@ -1470,7 +1461,7 @@ sub exec_at { my $pbuffer=''; my $olbuffer=''; my $elbuffer=''; - My::Select::reader($s,\&cmd_rx,$host,$s,\$pbuffer,\$olbuffer,\$elbuffer); + My::Select::reader($s,sub{cmd_rx($host,$s,\$pbuffer,\$olbuffer,\$elbuffer)}); $slave=1; My::Select::run() if $slave;; } @@ -1489,7 +1480,7 @@ sub lsof { clp_send_message($s,'LSOF',$pattern); my $pbuffer=''; my $olbuffer=''; - My::Select::reader($s,\&lsof_rx,$host,$s,\$pbuffer,\$olbuffer); + My::Select::reader($s,sub{lsof_rx($host,$s,\$pbuffer,\$olbuffer)}); $slave++; } My::Select::run() if $slave;;