From 26bc5ad358a00afb30318a098ea1bafa9e2f7018 Mon Sep 17 00:00:00 2001 From: Donald Buczek Date: Sat, 2 Dec 2017 11:12:43 +0100 Subject: [PATCH] Import Donald::Select into script The design of Donald::Select makes use of Donald::Callback objects, which are objects conainting a sub and call arguments. However, because of the pattern sub do_something_later() { my ($cb_or_sub,@args)=@_; $store_callback_somehere=new Donald::Callback($cb_or_sub,@args); } do_something_later(\&callback,$arg1,$arg2) the caller doesn't have a reference to the Donald::Callback object, which makes its diffucult to identify it, e.g. to cancel the callback. We want to change the design to accept only references to subs as callbacks. Instead of passing arguments, we exepect the caller to make use of closures to pass data to the callback if needed. sub do_something_later() { my ($cb)=@_; $store_callback_somewhere=$cb; } do_something_later(sub{callback($arg1,$args)}); Instead of changing the API of Donald::Select, we import the code directly into clusterd to make the modifications here. --- clusterd | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 210 insertions(+), 2 deletions(-) diff --git a/clusterd b/clusterd index e813978..8d865b4 100755 --- a/clusterd +++ b/clusterd @@ -10,12 +10,220 @@ our $REVISION='1.108'; #use lib ('/home/buczek/cluster/Donald/blib/lib'); use Donald::Tools qw(encode sign check_sign decode); -use Donald::Select; use Donald::FileInfo; -use Donald::Select::INET; use POSIX; use IO::Pipe; +#-------------------------------------- +package Donald::Select; + +use warnings; +use strict; + +our $VERSION = '1.00'; + +use Donald::Callback 1.01 ; +use Donald::Tools; + +our $time=Donald::Tools::uptime(); + +sub Donald::Select::time +{ + return $time; +} + +#----------------------------------------- + +our @TIMER=(); # ( [duetime,cb] , ) sorted by time +our $active_timer_cb; + +sub timeout # cb=Select::timeout(seconds,cb) or cb=Select::Timeout(seconds,subref [,args,...]) +{ + my ($delta,$cb_or_sub,@args)=@_; + my $cb=new Donald::Callback($cb_or_sub,@args); + + my $duetime=$time+$delta; + @TIMER=sort {$a->[0] <=> $b->[0]} ( [$duetime,$cb] , @TIMER ); + return $cb; +} + +sub timeout_cancel # cb=Select::timeout_cancel(cb) +{ + my ($cb)=@_; + @TIMER=grep {$_->[1] != $cb && $_->[1]->[0] != $cb} @TIMER; + return $cb; +} + +sub timeout_requeue {timeout(shift,$active_timer_cb); } # Select::timeout_requeue(seconds) + +#------------------------------------ + +our @READER; # ( [Handle,cb] , ... ) +our @WRITER; +our @EXCEPT; +our $active_io; # [Handle,cb] + +sub reader # cb = Select::reader(Handle,cb) +{ + my ($handle,$cb_or_sub,@args)=@_; + my $cb=new Donald::Callback($cb_or_sub,@args); + push @READER,[$handle,$cb]; + return $cb; +} + +sub writer # cb = Select::writer(Handle,cb) +{ + my ($handle,$cb_or_sub,@args)=@_; + my $cb=new Donald::Callback($cb_or_sub,@args); + push @WRITER,[$handle,$cb]; + return $cb; +} + +sub except # cb = Select::except(Handle,cb) +{ + my ($handle,$cb_or_sub,@args)=@_; + my $cb=new Donald::Callback($cb_or_sub,@args); + push @EXCEPT,[$handle,$cb]; + return $cb; +} + +sub reader_requeue {push @READER,$active_io} +sub writer_requeue {push @WRITER,$active_io} +sub except_requeue {push @EXCEPT,$active_io} + +sub cancel # $cb = Select::cancel([cb]) +{ + my ($cb)=@_; + defined $cb or $cb=$active_io->[1]; + @READER=grep {$_->[1] != $cb} @READER; + @WRITER=grep {$_->[1] != $cb} @WRITER; + @EXCEPT=grep {$_->[1] != $cb} @EXCEPT; +} + +sub cancel_handle +{ + my ($handle)=@_; + @READER=grep {$_->[0] != $handle} @READER; + @WRITER=grep {$_->[0] != $handle} @WRITER; + @EXCEPT=grep {$_->[0] != $handle} @EXCEPT; +} + +#-------------------------------------------- + +sub heartbeat +{ + $time++; + while (@TIMER && $TIMER[0]->[0]<=$time) { + $active_timer_cb=(shift @TIMER)->[1]; + $active_timer_cb->call(); + } + $active_timer_cb=undef; +} + +sub run +{ + while (1) { + $time=Donald::Tools::uptime(); + while (@TIMER && $TIMER[0]->[0]<=$time) { + $active_timer_cb=(shift @TIMER)->[1]; + $active_timer_cb->call(); + } + $active_timer_cb=undef; + + my ($rvec,$wvec,$evec)=('','',''); + + for (@READER) { vec($rvec,$_->[0]->fileno,1)=1 } ; + for (@WRITER) { vec($wvec,$_->[0]->fileno,1)=1 } ; + for (@EXCEPT) { vec($evec,$_->[0]->fileno,1)=1 } ; + + my $ready=select($rvec,$wvec,$evec,1); + if ($ready>0) { + for (my $i=0;$i<@READER;$i++) { + if (vec($rvec,$READER[$i]->[0]->fileno,1)) { + $active_io=splice @READER,$i,1; + $active_io->[1]->call(); + $active_io=undef; + last; + } + } + for (my $i=0;$i<@WRITER;$i++) { + if (vec($wvec,$WRITER[$i]->[0]->fileno,1)) { + $active_io=splice @WRITER,$i,1; + $active_io->[1]->call(); + $active_io=undef; + last; + } + } + for (my $i=0;$i<@EXCEPT;$i++) { + if (vec($evec,$EXCEPT[$i]->[0]->fileno,1)) { + $active_io=splice @EXCEPT,$i,1; + $active_io->[1]->call(); + $active_io=undef; + last; + } + } + } + } +} + +1; +#-------------------------------------- +package Donald::Select::INET ; + +use warnings; +use strict; + +use Carp; +use IO::Socket::INET; +use Digest::MD5; +use Storable; + +our $VERSION = '1.00'; + +our $UDP_MAX=1472; # for broadcast on alphas + + +sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) +{ + my ($class,@args)=@_; + our $socket=new IO::Socket::INET (@args) or return undef; + return bless \$socket,$class; +} + + +sub send_data +{ + my ($self,$ip,$port,$data)=@_; + my $ip_address=inet_aton($ip); + unless (defined $ip_address) {carp("can't resolve $ip\n");return undef} + unless (length($data)<=$UDP_MAX) {carp("message to long\n");return undef} + $$self->send($data,0,pack_sockaddr_in($port,$ip_address)) or carp "$!\n"; +} + + +sub reader +{ + my ($self,$sub,@args)=@_; + Donald::Select::reader($$self,$sub,@args); +} + +sub receive_data +{ + my ($self,$sub,@args)=@_; + Donald::Select::reader($$self,\&receive_data_cb,$self,$sub,@args); +} + +sub receive_data_cb +{ + my ($self,$sub,@args)=@_; + my $data; + my $peer = $$self->recv($data,$UDP_MAX); + my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); + my $udp_peer_addr=inet_ntoa($peer_iaddr); + Donald::Select::reader_requeue(); + $sub->($data,$udp_peer_addr,$udp_peer_port,@args); +} + #-------------------------------------- package My::Cluster::Updown;