diff --git a/clusterd b/clusterd index e813978..8d865b4 100755 --- a/clusterd +++ b/clusterd @@ -10,12 +10,220 @@ our $REVISION='1.108'; #use lib ('/home/buczek/cluster/Donald/blib/lib'); use Donald::Tools qw(encode sign check_sign decode); -use Donald::Select; use Donald::FileInfo; -use Donald::Select::INET; use POSIX; use IO::Pipe; +#-------------------------------------- +package Donald::Select; + +use warnings; +use strict; + +our $VERSION = '1.00'; + +use Donald::Callback 1.01 ; +use Donald::Tools; + +our $time=Donald::Tools::uptime(); + +sub Donald::Select::time +{ + return $time; +} + +#----------------------------------------- + +our @TIMER=(); # ( [duetime,cb] , ) sorted by time +our $active_timer_cb; + +sub timeout # cb=Select::timeout(seconds,cb) or cb=Select::Timeout(seconds,subref [,args,...]) +{ + my ($delta,$cb_or_sub,@args)=@_; + my $cb=new Donald::Callback($cb_or_sub,@args); + + my $duetime=$time+$delta; + @TIMER=sort {$a->[0] <=> $b->[0]} ( [$duetime,$cb] , @TIMER ); + return $cb; +} + +sub timeout_cancel # cb=Select::timeout_cancel(cb) +{ + my ($cb)=@_; + @TIMER=grep {$_->[1] != $cb && $_->[1]->[0] != $cb} @TIMER; + return $cb; +} + +sub timeout_requeue {timeout(shift,$active_timer_cb); } # Select::timeout_requeue(seconds) + +#------------------------------------ + +our @READER; # ( [Handle,cb] , ... ) +our @WRITER; +our @EXCEPT; +our $active_io; # [Handle,cb] + +sub reader # cb = Select::reader(Handle,cb) +{ + my ($handle,$cb_or_sub,@args)=@_; + my $cb=new Donald::Callback($cb_or_sub,@args); + push @READER,[$handle,$cb]; + return $cb; +} + +sub writer # cb = Select::writer(Handle,cb) +{ + my ($handle,$cb_or_sub,@args)=@_; + my $cb=new Donald::Callback($cb_or_sub,@args); + push @WRITER,[$handle,$cb]; + return $cb; +} + +sub except # cb = Select::except(Handle,cb) +{ + my ($handle,$cb_or_sub,@args)=@_; + my $cb=new Donald::Callback($cb_or_sub,@args); + push @EXCEPT,[$handle,$cb]; + return $cb; +} + +sub reader_requeue {push @READER,$active_io} +sub writer_requeue {push @WRITER,$active_io} +sub except_requeue {push @EXCEPT,$active_io} + +sub cancel # $cb = Select::cancel([cb]) +{ + my ($cb)=@_; + defined $cb or $cb=$active_io->[1]; + @READER=grep {$_->[1] != $cb} @READER; + @WRITER=grep {$_->[1] != $cb} @WRITER; + @EXCEPT=grep {$_->[1] != $cb} @EXCEPT; +} + +sub cancel_handle +{ + my ($handle)=@_; + @READER=grep {$_->[0] != $handle} @READER; + @WRITER=grep {$_->[0] != $handle} @WRITER; + @EXCEPT=grep {$_->[0] != $handle} @EXCEPT; +} + +#-------------------------------------------- + +sub heartbeat +{ + $time++; + while (@TIMER && $TIMER[0]->[0]<=$time) { + $active_timer_cb=(shift @TIMER)->[1]; + $active_timer_cb->call(); + } + $active_timer_cb=undef; +} + +sub run +{ + while (1) { + $time=Donald::Tools::uptime(); + while (@TIMER && $TIMER[0]->[0]<=$time) { + $active_timer_cb=(shift @TIMER)->[1]; + $active_timer_cb->call(); + } + $active_timer_cb=undef; + + my ($rvec,$wvec,$evec)=('','',''); + + for (@READER) { vec($rvec,$_->[0]->fileno,1)=1 } ; + for (@WRITER) { vec($wvec,$_->[0]->fileno,1)=1 } ; + for (@EXCEPT) { vec($evec,$_->[0]->fileno,1)=1 } ; + + my $ready=select($rvec,$wvec,$evec,1); + if ($ready>0) { + for (my $i=0;$i<@READER;$i++) { + if (vec($rvec,$READER[$i]->[0]->fileno,1)) { + $active_io=splice @READER,$i,1; + $active_io->[1]->call(); + $active_io=undef; + last; + } + } + for (my $i=0;$i<@WRITER;$i++) { + if (vec($wvec,$WRITER[$i]->[0]->fileno,1)) { + $active_io=splice @WRITER,$i,1; + $active_io->[1]->call(); + $active_io=undef; + last; + } + } + for (my $i=0;$i<@EXCEPT;$i++) { + if (vec($evec,$EXCEPT[$i]->[0]->fileno,1)) { + $active_io=splice @EXCEPT,$i,1; + $active_io->[1]->call(); + $active_io=undef; + last; + } + } + } + } +} + +1; +#-------------------------------------- +package Donald::Select::INET ; + +use warnings; +use strict; + +use Carp; +use IO::Socket::INET; +use Digest::MD5; +use Storable; + +our $VERSION = '1.00'; + +our $UDP_MAX=1472; # for broadcast on alphas + + +sub new # ( Proto=>'udp',Broadcast=>1,LocalPort=>$UDP_PORT ) +{ + my ($class,@args)=@_; + our $socket=new IO::Socket::INET (@args) or return undef; + return bless \$socket,$class; +} + + +sub send_data +{ + my ($self,$ip,$port,$data)=@_; + my $ip_address=inet_aton($ip); + unless (defined $ip_address) {carp("can't resolve $ip\n");return undef} + unless (length($data)<=$UDP_MAX) {carp("message to long\n");return undef} + $$self->send($data,0,pack_sockaddr_in($port,$ip_address)) or carp "$!\n"; +} + + +sub reader +{ + my ($self,$sub,@args)=@_; + Donald::Select::reader($$self,$sub,@args); +} + +sub receive_data +{ + my ($self,$sub,@args)=@_; + Donald::Select::reader($$self,\&receive_data_cb,$self,$sub,@args); +} + +sub receive_data_cb +{ + my ($self,$sub,@args)=@_; + my $data; + my $peer = $$self->recv($data,$UDP_MAX); + my ($udp_peer_port,$peer_iaddr)=unpack_sockaddr_in($peer); + my $udp_peer_addr=inet_ntoa($peer_iaddr); + Donald::Select::reader_requeue(); + $sub->($data,$udp_peer_addr,$udp_peer_port,@args); +} + #-------------------------------------- package My::Cluster::Updown;