Skip to content

Commit

Permalink
Remove My::Callback
Browse files Browse the repository at this point in the history
Use refenerces to subs as callback arguments. If the caller wants to
pass additional arguments, he can use closures.
  • Loading branch information
donald committed Dec 7, 2017
1 parent 7ed96d6 commit 3901aa7
Showing 1 changed file with 29 additions and 38 deletions.
67 changes: 29 additions & 38 deletions clusterd
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ use Donald::FileInfo;
use POSIX;
use IO::Pipe;

#--------------------------------------
#-------------------------------------
package My::Select;

use Donald::Callback 1.01 ;
use Donald::Tools;

our $time=Donald::Tools::uptime();
Expand All @@ -30,11 +29,9 @@ sub My::Select::time
our @TIMER=(); # ( [duetime,cb] , ... ) sorted by time
our $active_timer_cb;

sub timeout # cb=Select::timeout(seconds,cb) or cb=Select::Timeout(seconds,subref [,args,...])
sub timeout # cb=Select::timeout(seconds,cb)
{
my ($delta,$cb_or_sub,@args)=@_;
my $cb=new Donald::Callback($cb_or_sub,@args);

my ($delta,$cb)=@_;
my $duetime=$time+$delta;
@TIMER=sort {$a->[0] <=> $b->[0]} ( [$duetime,$cb] , @TIMER );
return $cb;
Expand All @@ -43,7 +40,7 @@ sub timeout # cb=Select::timeout(seconds,cb) or cb=Select::Timeout(seconds,sub
sub timeout_cancel # cb=Select::timeout_cancel(cb)
{
my ($cb)=@_;
@TIMER=grep {$_->[1] != $cb && $_->[1]->[0] != $cb} @TIMER;
@TIMER=grep {$_->[1] != $cb} @TIMER;
return $cb;
}

Expand All @@ -58,24 +55,21 @@ our $active_io; # [Handle,cb]

sub reader # cb = Select::reader(Handle,cb)
{
my ($handle,$cb_or_sub,@args)=@_;
my $cb=new Donald::Callback($cb_or_sub,@args);
my ($handle,$cb)=@_;
push @READER,[$handle,$cb];
return $cb;
}

sub writer # cb = Select::writer(Handle,cb)
{
my ($handle,$cb_or_sub,@args)=@_;
my $cb=new Donald::Callback($cb_or_sub,@args);
my ($handle,$cb)=@_;
push @WRITER,[$handle,$cb];
return $cb;
}

sub except # cb = Select::except(Handle,cb)
{
my ($handle,$cb_or_sub,@args)=@_;
my $cb=new Donald::Callback($cb_or_sub,@args);
my ($handle,$cb)=@_;
push @EXCEPT,[$handle,$cb];
return $cb;
}
Expand Down Expand Up @@ -107,7 +101,7 @@ sub run
$time=Donald::Tools::uptime();
while (@TIMER && $TIMER[0]->[0]<=$time) {
$active_timer_cb=(shift @TIMER)->[1];
$active_timer_cb->call();
$active_timer_cb->();
}
$active_timer_cb=undef;

Expand All @@ -122,23 +116,23 @@ sub run
for (my $i=0;$i<@READER;$i++) {
if (vec($rvec,$READER[$i]->[0]->fileno,1)) {
$active_io=splice @READER,$i,1;
$active_io->[1]->call();
$active_io->[1]->();
$active_io=undef;
last;
}
}
for (my $i=0;$i<@WRITER;$i++) {
if (vec($wvec,$WRITER[$i]->[0]->fileno,1)) {
$active_io=splice @WRITER,$i,1;
$active_io->[1]->call();
$active_io->[1]->();
$active_io=undef;
last;
}
}
for (my $i=0;$i<@EXCEPT;$i++) {
if (vec($evec,$EXCEPT[$i]->[0]->fileno,1)) {
$active_io=splice @EXCEPT,$i,1;
$active_io->[1]->call();
$active_io->[1]->();
$active_io=undef;
last;
}
Expand All @@ -157,7 +151,6 @@ use Storable;

our $UDP_MAX=1472; # for broadcast on alphas


sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT )
{
my ($class,@args)=@_;
Expand All @@ -178,25 +171,23 @@ sub send_data

sub reader
{
my ($self,$sub,@args)=@_;
My::Select::reader($$self,$sub,@args);
my ($self,$sub)=@_;
My::Select::reader($$self,$sub);
}

sub receive_data
{
my ($self,$sub,@args)=@_;
My::Select::reader($$self,\&receive_data_cb,$self,$sub,@args);
}
my ($self,$cb,@args)=@_;

sub receive_data_cb
{
my ($self,$sub,@args)=@_;
my $data;
my $peer = $$self->recv($data,$UDP_MAX);
my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer);
my $udp_peer_addr=inet_ntoa($peer_iaddr);
My::Select::reader_requeue();
$sub->($data,$udp_peer_addr,$udp_peer_port,@args);
my $receive_data_cb=sub {
my $data;
my $peer = $$self->recv($data,$UDP_MAX);
my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer);
my $udp_peer_addr=inet_ntoa($peer_iaddr);
My::Select::reader_requeue();
$cb->($data,$udp_peer_addr,$udp_peer_port);
};
My::Select::reader($$self,$receive_data_cb);
}

#--------------------------------------
Expand Down Expand Up @@ -421,7 +412,7 @@ sub connect_request {

my $peernode=$socket->peerhost;
my $buffer='';
My::Select::reader($socket,\&receive,$socket,$peernode,\$buffer);
My::Select::reader($socket,sub{receive($socket,$peernode,\$buffer)});
# warn "$peernode: connect\n";
}

Expand Down Expand Up @@ -1015,7 +1006,7 @@ sub mgmt_connect_request {

### warn "accepted mgmt connection from $peernode\n";

My::Select::reader($socket,\&mgmt_receive,$socket);
My::Select::reader($socket,sub{mgmt_receive($socket)});
$mgmt_sockets{$socket}=$socket;
$socket->print("clusterd ".version_info()." stupid console\n");
$socket->print("For historical messages, grep \"clusterd\" from /var/log/messages on $STAT_TARGET (or \"tail -f /var/log/messages |grep cluster\")\n");
Expand Down Expand Up @@ -1230,7 +1221,7 @@ sub clp_connect_request {

my $buffer='';

My::Select::reader($socket,\&clp_receive,$socket,\$buffer);
My::Select::reader($socket,sub{clp_receive($socket,\$buffer)});
$clp_sockets{$socket}=$socket;
}

Expand Down Expand Up @@ -1346,7 +1337,7 @@ sub init_area {
warn "I am area router for $AREA_ROUTER{$my_hostname}\n";

$area_socket=new IO::Socket::INET (Proto=>'udp',LocalPort=>$UDP_PORT+1) or die "$!\n";
My::Select::reader($area_socket,\&area_message,$area_socket);
My::Select::reader($area_socket,sub{area_message($area_socket)});
}

sub area_message {
Expand Down Expand Up @@ -1470,7 +1461,7 @@ sub exec_at {
my $pbuffer='';
my $olbuffer='';
my $elbuffer='';
My::Select::reader($s,\&cmd_rx,$host,$s,\$pbuffer,\$olbuffer,\$elbuffer);
My::Select::reader($s,sub{cmd_rx($host,$s,\$pbuffer,\$olbuffer,\$elbuffer)});
$slave=1;
My::Select::run() if $slave;;
}
Expand All @@ -1489,7 +1480,7 @@ sub lsof {
clp_send_message($s,'LSOF',$pattern);
my $pbuffer='';
my $olbuffer='';
My::Select::reader($s,\&lsof_rx,$host,$s,\$pbuffer,\$olbuffer);
My::Select::reader($s,sub{lsof_rx($host,$s,\$pbuffer,\$olbuffer)});
$slave++;
}
My::Select::run() if $slave;;
Expand Down

0 comments on commit 3901aa7

Please sign in to comment.