Skip to content

Commit

Permalink
Import Donald::Select into script
Browse files Browse the repository at this point in the history
The design of Donald::Select makes use of Donald::Callback objects,
which are objects conainting a sub and call arguments. However,
because of the pattern

    sub do_something_later() {
        my ($cb_or_sub,@args)=@_;
            $store_callback_somehere=new Donald::Callback($cb_or_sub,@args);
        }

    do_something_later(\&callback,$arg1,$arg2)

the caller doesn't have a reference to the Donald::Callback object,
which makes its diffucult to identify it, e.g. to cancel the callback.

We want to change the design to accept only references to subs as
callbacks. Instead of passing arguments, we exepect the caller to make
use of closures to pass data to the callback if needed.

    sub do_something_later() {
        my ($cb)=@_;
        $store_callback_somewhere=$cb;
    }

    do_something_later(sub{callback($arg1,$args)});

Instead of changing the API of Donald::Select, we import the code
directly into clusterd to make the modifications here.
  • Loading branch information
donald committed Dec 7, 2017
1 parent 0f58943 commit 26bc5ad
Showing 1 changed file with 210 additions and 2 deletions.
212 changes: 210 additions & 2 deletions clusterd
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,220 @@ our $REVISION='1.108';
#use lib ('/home/buczek/cluster/Donald/blib/lib');

use Donald::Tools qw(encode sign check_sign decode);
use Donald::Select;
use Donald::FileInfo;
use Donald::Select::INET;
use POSIX;
use IO::Pipe;

#--------------------------------------
package Donald::Select;

use warnings;
use strict;

our $VERSION = '1.00';

use Donald::Callback 1.01 ;
use Donald::Tools;

our $time=Donald::Tools::uptime();

sub Donald::Select::time
{
return $time;
}

#-----------------------------------------

our @TIMER=(); # ( [duetime,cb] , ) sorted by time
our $active_timer_cb;

sub timeout # cb=Select::timeout(seconds,cb) or cb=Select::Timeout(seconds,subref [,args,...])
{
my ($delta,$cb_or_sub,@args)=@_;
my $cb=new Donald::Callback($cb_or_sub,@args);

my $duetime=$time+$delta;
@TIMER=sort {$a->[0] <=> $b->[0]} ( [$duetime,$cb] , @TIMER );
return $cb;
}

sub timeout_cancel # cb=Select::timeout_cancel(cb)
{
my ($cb)=@_;
@TIMER=grep {$_->[1] != $cb && $_->[1]->[0] != $cb} @TIMER;
return $cb;
}

sub timeout_requeue {timeout(shift,$active_timer_cb); } # Select::timeout_requeue(seconds)

#------------------------------------

our @READER; # ( [Handle,cb] , ... )
our @WRITER;
our @EXCEPT;
our $active_io; # [Handle,cb]

sub reader # cb = Select::reader(Handle,cb)
{
my ($handle,$cb_or_sub,@args)=@_;
my $cb=new Donald::Callback($cb_or_sub,@args);
push @READER,[$handle,$cb];
return $cb;
}

sub writer # cb = Select::writer(Handle,cb)
{
my ($handle,$cb_or_sub,@args)=@_;
my $cb=new Donald::Callback($cb_or_sub,@args);
push @WRITER,[$handle,$cb];
return $cb;
}

sub except # cb = Select::except(Handle,cb)
{
my ($handle,$cb_or_sub,@args)=@_;
my $cb=new Donald::Callback($cb_or_sub,@args);
push @EXCEPT,[$handle,$cb];
return $cb;
}

sub reader_requeue {push @READER,$active_io}
sub writer_requeue {push @WRITER,$active_io}
sub except_requeue {push @EXCEPT,$active_io}

sub cancel # $cb = Select::cancel([cb])
{
my ($cb)=@_;
defined $cb or $cb=$active_io->[1];
@READER=grep {$_->[1] != $cb} @READER;
@WRITER=grep {$_->[1] != $cb} @WRITER;
@EXCEPT=grep {$_->[1] != $cb} @EXCEPT;
}

sub cancel_handle
{
my ($handle)=@_;
@READER=grep {$_->[0] != $handle} @READER;
@WRITER=grep {$_->[0] != $handle} @WRITER;
@EXCEPT=grep {$_->[0] != $handle} @EXCEPT;
}

#--------------------------------------------

sub heartbeat
{
$time++;
while (@TIMER && $TIMER[0]->[0]<=$time) {
$active_timer_cb=(shift @TIMER)->[1];
$active_timer_cb->call();
}
$active_timer_cb=undef;
}

sub run
{
while (1) {
$time=Donald::Tools::uptime();
while (@TIMER && $TIMER[0]->[0]<=$time) {
$active_timer_cb=(shift @TIMER)->[1];
$active_timer_cb->call();
}
$active_timer_cb=undef;

my ($rvec,$wvec,$evec)=('','','');

for (@READER) { vec($rvec,$_->[0]->fileno,1)=1 } ;
for (@WRITER) { vec($wvec,$_->[0]->fileno,1)=1 } ;
for (@EXCEPT) { vec($evec,$_->[0]->fileno,1)=1 } ;

my $ready=select($rvec,$wvec,$evec,1);
if ($ready>0) {
for (my $i=0;$i<@READER;$i++) {
if (vec($rvec,$READER[$i]->[0]->fileno,1)) {
$active_io=splice @READER,$i,1;
$active_io->[1]->call();
$active_io=undef;
last;
}
}
for (my $i=0;$i<@WRITER;$i++) {
if (vec($wvec,$WRITER[$i]->[0]->fileno,1)) {
$active_io=splice @WRITER,$i,1;
$active_io->[1]->call();
$active_io=undef;
last;
}
}
for (my $i=0;$i<@EXCEPT;$i++) {
if (vec($evec,$EXCEPT[$i]->[0]->fileno,1)) {
$active_io=splice @EXCEPT,$i,1;
$active_io->[1]->call();
$active_io=undef;
last;
}
}
}
}
}

1;
#--------------------------------------
package Donald::Select::INET ;

use warnings;
use strict;

use Carp;
use IO::Socket::INET;
use Digest::MD5;
use Storable;

our $VERSION = '1.00';

our $UDP_MAX=1472; # for broadcast on alphas


sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT )
{
my ($class,@args)=@_;
our $socket=new IO::Socket::INET (@args) or return undef;
return bless \$socket,$class;
}


sub send_data
{
my ($self,$ip,$port,$data)=@_;
my $ip_address=inet_aton($ip);
unless (defined $ip_address) {carp("can't resolve $ip\n");return undef}
unless (length($data)<=$UDP_MAX) {carp("message to long\n");return undef}
$$self->send($data,0,pack_sockaddr_in($port,$ip_address)) or carp "$!\n";
}


sub reader
{
my ($self,$sub,@args)=@_;
Donald::Select::reader($$self,$sub,@args);
}

sub receive_data
{
my ($self,$sub,@args)=@_;
Donald::Select::reader($$self,\&receive_data_cb,$self,$sub,@args);
}

sub receive_data_cb
{
my ($self,$sub,@args)=@_;
my $data;
my $peer = $$self->recv($data,$UDP_MAX);
my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer);
my $udp_peer_addr=inet_ntoa($peer_iaddr);
Donald::Select::reader_requeue();
$sub->($data,$udp_peer_addr,$udp_peer_port,@args);
}

#--------------------------------------
package My::Cluster::Updown;

Expand Down

0 comments on commit 26bc5ad

Please sign in to comment.