Skip to content
Permalink
master
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
executable file 1754 lines (1410 sloc) 48.2 KB
#! /usr/bin/perl
use strict;
use warnings;
use DBI;
use IO::File;
use Time::Local;
use Data::Dumper;
our $hostname=`/bin/hostname -s`;$? and exit 1;chomp($hostname);
our $DAYS = 185;
our $deltatimeok = 24*60*60;
our $deltatimefail = 2*60*60;
our $BINDIR = "@BINDIR@";
our $LOGDIR = "@PB_LOGDIR@";
our $DATADIR = "@PB_VOLUMEDIR@";
our $SQLITEDBFILE = "@PB_SQLITEDB@";
our $SERVERDATFILE = "@PB_SERVERFILE@";
our $JOBSCFGFILE = "@PB_JOBSFILE@";
our $EXCLUDESFILE = "@PB_EXECLUDESFILE@";
our $RSYNCFILTERFILE = "@PB_RSYNCFILTERFILE@";
our $SSHPRIVATEKEY = "@PB_SSHPRIVATEKEY@";
$ENV{SQLITE_TMPDIR} = "@PB_SQLITEDIR@";
$ENV{TMPDIR} = "@PB_SQLITEDIR@";
sub USAGE {
return <<"EOF";
usage: $0 command [args]
status : show processes and head enabled of job queue
jobs : show all jobs
job JOB_NAME : show given jobs with all runs
monitor : "iview 1 pbackup status"
lsof : "lsof ${DATADIR}"
stat : show TOP 10 : source sice, elapsed time, transferred size
stat expire : show oldest 20 jobs ( top of expire queue ) and expire date
stat weight : max weight (max(origin size) + sum(bytes transferred)
stat week : sum elapsed and transferred last 7 days
stat oversize : everything over 1TB source size for email
stat dusize : sizes of all active jobs of last run
stat zombies : old jobs which don't expire, because last run ist kept
do_jobs : continuously run a backup process
expire : continuously run the expire process
balance : continuously run the balancer process
sql SQL : execute sql command
kill PID : let process exit after current task is done
amd_scan : updated jobs NOW (also automatically done every hour)
verify : crosscheck volumes and database
move JOB_NAME Cnnn : move all runs of jobs with name to volume
bfix JOB_NAME : show some help to fix status after aborted balancer moves
fix_stat : fill in 'du' satt for all runs where we dont have the data yet
test : DO NOT USE: do something
disable JOB_NAME : disable job temporarlily until next amd_scan
Manual: http://twiki.molgen.mpg.de/foswiki/PbackupForAdmins
EOF
}
################################### tools self contained
sub first_line_of_file {
our ($filename)=@_;
my $fh=new IO::File ($filename,'<') or die "$filename: $!\n";
my $line=$fh->getline();
chomp($line);
return $line;
}
sub df {
my ($path)=@_;
my $pid=open P,'-|';
defined $pid or die "$0: $!\n";
unless ($pid) {
exec 'df','-k',$path;
die "$0: $!\n";
}
my $l;
$l=<P>;
$l=<P>;
chomp $l;
my ($device,$blocks,$used,$avail,$perc,$ppath)=split " ",$l;
1 while ($l=readline(*P));
close P;
return $avail;
}
our %FS_FREE_CACHE;
sub df_cached { # w/o arguments: clear cache
unless (@_) {
%FS_FREE_CACHE=();
return;
}
my ($fs)=@_;
$FS_FREE_CACHE{$fs}=df($fs) unless exists $FS_FREE_CACHE{$fs};
return $FS_FREE_CACHE{$fs};
}
sub du_bytes {
my ($dir)=@_;
my $pid=open my $p,'-|';
defined $pid or die "$!\n";
unless ($pid) {
chdir $dir or die "$dir: $!\n";
exec 'du','--block-size=1','-s','.';
die "$!\n";
}
my $out=<$p>;
close $p;
$? and die "du failed\n";
my ($bytes)=split " ",$out;
return $bytes;
}
sub scan_rsync_log {
my ($logfile)=@_;
open L,'<',$logfile or die "$logfile: $!\n";
seek L,-5000,2; # typical entry is 1000 bytes
my @f;
while (<L>) {
# Number of files is the count of all "files" (in the generic sense), which includes directories, symlinks, etc.
# Number of files transferred is the count of normal files that were updated via rsync's delta-transfer algorithm, which does not include created dirs, symlinks, etc.
# Total file size is the total sum of all file sizes in the transfer. This does not count any size for directories or special files, but does include the size of symlinks.
# Total transferred file size is the total sum of all files sizes for just the transferred files.
# Literal data is how much unmatched file-update data we had to send to the receiver for it to recreate the updated files.
# Matched data is how much data the receiver got locally when recreating the updated files.
if (/^Number of files: (\d+)/) {
@f=($1);
} elsif (/^Number of files transferred: (\d+)/) {
$f[1]=$1;
} elsif (/^Total file size: (\d+)/) {
$f[2]=$1;
} elsif (/^Total transferred file size: (\d+)/) {
$f[3]=$1;
} elsif (/^Literal data: (\d+)/) {
$f[4]=$1;
} elsif (/^Matched data: (\d+)/) {
$f[5]=$1;
}
}
return @f;
}
###################################
our $dbh;
sub db_open {
$dbh=DBI->connect("dbi:SQLite:dbname=${SQLITEDBFILE}","","",{
AutoCommit=>1,
PrintError=>0,
RaiseError=>1,
PrintWarn=>1,
sqlite_use_immediate_transaction => 1,
});
$dbh or die "$DBI::errstr\n";
#print "TMO: ",$dbh->sqlite_busy_timeout(),"\n"; # default 30000 msec
$dbh->sqlite_busy_timeout(24*60*60*1000);
}
##################################
use constant {
LCK_SH => 1,
LCK_EX => 2,
};
sub lck_lock {
my ($lck_name,$lck_mode)=@_;
$dbh->do('INSERT INTO lock(lck_name,lck_mode,lck_pid) VALUES(?,?,?)',undef,$lck_name,$lck_mode,$$);
}
sub lck_can_lock {
my ($lck_name,$want_mode)=@_;
my ($lck_mode)=$dbh->selectrow_array('SELECT MAX(lck_mode) FROM lock WHERE lck_name=?',undef,$lck_name);
if (!defined $lck_mode) {
return 1;
} elsif ($lck_mode==LCK_SH) {
return $want_mode==LCK_SH ? 1 : 0;
} else {
return 0;
}
}
sub lck_unlock {
my ($lck_name,$lck_mode)=@_;
$dbh->do('DELETE FROM lock WHERE lck_name=? AND lck_mode=? AND lck_pid=?',undef,$lck_name,$lck_mode,$$);
}
sub lck_get_pid {
my ($lck_name)=@_;
my ($lck_pid)=$dbh->selectrow_array('SELECT lck_pid FROM lock WHERE lck_name=?',undef,$lck_name);
return $lck_pid;
}
##################################
sub datadirs {
opendir D,$DATADIR or die "$DATADIR: $!\n";
my @r=map "$DATADIR/$_",sort grep !/^\./,readdir D;
close D;
return @r;
}
sub datadirs_for_balance {
my %datadirs=map {$_=>1} datadirs();
# delete $datadirs{"$DATADIR/C3019"}; # we want this empty
# delete $datadirs{"$DATADIR/C3028"}; # keep avail vor C3019 data
return keys %datadirs;
}
sub free_datadir {
# datadir to write to: the one with the most free space
# uses cached df
my %df=map {df_cached($_) => $_} datadirs_for_balance();
%df or die "no data directories available\n";
return $df{(sort {$b<=>$a} keys %df)[0]};
}
sub logfile {
my ($job)=@_;
return "${LOGDIR}/$job.log";
}
sub openlog {
my ($job)=@_;
my $logfile=logfile($job);
open LOG,'>>',$logfile or die "$logfile: $!\n";
}
sub fstag {
my ($path)=@_;
$path=~m"/amd/([^/]+)/(\d+)/(home|project|package|src)/" and return "$1_$2"; # /amd/pille/1/home/abt_lh/bukowiec -> pille_1
$path=~m"/amd/[^/]+/X/(X\d\d\d\d)/(home|project|package|src)/" and return "$1"; # /amd/pupsi/X/X0059/home/abt_owl/stelzl -> X0059
$path=~m"/amd/[^/]+/C/(C\d\d\d\d)/(confidential)/" and return "$1"; # /amd/taphophobie/C/C3024/confidential/clldata/data -> 'C3024'
$path=~m"/amd/([^/]+)/(\d+)/" and return "$1_$2";
$path=~m"/amd/([^/]+)/(\d+)$" and return "$1_$2";
$path=~m"/mnt/([^/]+)/(\d+)/" and return "$1_$2";
$path=~m"/mnt/([^/]+)/(\d+)$" and return "$1_$2";
$path=~m"/mnt/([^/]+)/" and return "$1";
$path=~m"^/$" and return "root";
$path=~m"/[^/]+/([^/]+)/(\d+)/" and return "$1_$2";
$path=~m"/[^/]+/([^/]+)/(\d+)$" and return "$1_$2";
return 'untagged';
}
sub timetag { # $time -> '201210051233'
my ($time)=@_;
my @f=localtime($time);
return sprintf('%04d%02d%02d%02d%02d',$f[5]+1900,$f[4]+1,$f[3],$f[2],$f[1]);
}
sub tag2time {
my ($tag)=@_;
my ($year,$mon,$mday,$hour,$min)=$tag=~/^(\d\d\d\d)(\d\d)(\d\d)(\d\d)(\d\d)$/ or return undef;
return timelocal(0,$min,$hour,$mday,$mon-1,$year-1900);
}
#################################### unique pid
our $BOOT_ID; # system incarnation - lazy init
sub upid_incarnation {
my ($pid)=@_;
my $stat;
defined $BOOT_ID or $BOOT_ID=first_line_of_file('/proc/sys/kernel/random/boot_id');
-e "/proc/$pid/stat" or return undef;
eval {
$stat=first_line_of_file("/proc/$pid/stat");
};
if ($@) {
$@=~ /No such file or directory/ and return undef;
die $@;
}
my $stime=(split (" ",$stat))[21];
return $BOOT_ID.'.'.$stime;
}
sub upid_register_self {
$dbh->do('INSERT OR REPLACE INTO upid(upid_pid,upid_incarnation) VALUES (?,?)',undef,$$,upid_incarnation($$));
}
sub upid_deregister_self {
$dbh->do('DELETE FROM upid WHERE upid_pid=?',undef,$$);
}
sub upid_purge {
my $t=$dbh->selectall_arrayref('SELECT upid_pid,upid_incarnation FROM upid');
for my $r (@$t) {
my ($upid_pid,$upid_incarnation)=@$r;
my $incarnation=upid_incarnation($upid_pid);
unless (defined $incarnation && $incarnation eq $upid_incarnation) {
$dbh->do('DELETE FROM upid WHERE upid_pid=? AND upid_incarnation=?',undef,$upid_pid,$upid_incarnation);
#print "upid purged zombie : $upid_pid $upid_incarnation\n";
}
}
}
sub upid_doing {
my ($text)=@_;
$dbh->do('UPDATE upid SET upid_text=?,upid_since=? WHERE upid_pid=? AND upid_text!=?',undef,$text,time,$$,$text);
}
sub upid_clear {
upid_doing("");
}
sub upid_kill {
my ($pid)=@_;
$dbh->do('UPDATE upid SET upid_shutdown_pending=1 WHERE upid_pid=?',undef,$pid);
}
sub upid_shutdown_pending {
my ($upid_shutdown_pending) = $dbh->selectrow_array('SELECT upid_shutdown_pending FROM upid WHERE upid_pid=?',undef,$$);
return $upid_shutdown_pending;
}
################################
our %BACKUP_SERVER; # ( "fileserver-or-jobnam" => {backupserver=>1, ...} , ... )
sub read_server() {
%BACKUP_SERVER=();
open IN,'<',"${SERVERDATFILE}" or die "${SERVERDATFILE}: $!\n";
while (<IN>) {
chomp;
s/#.*//;
s/\s+$//;
/\S/ or next;
my ($fileserver_or_jobname,$backupserverlist)=split;
my @backupserverlist=split ',',$backupserverlist;
for my $backupserver (@backupserverlist) {
$backupserver eq 'null' or $backupserver eq 'void' or warn "${SERVERDATFILE}: line $. backup server $backupserver unknown\n";
$BACKUP_SERVER{$fileserver_or_jobname}{$backupserver}=1;
}
}
close IN;
}
sub i_want_this_fileserver {
my ($fileserver,$jobname)=@_;
exists $BACKUP_SERVER{$jobname} and return exists $BACKUP_SERVER{$jobname}{$hostname};
exists $BACKUP_SERVER{$fileserver} and return exists $BACKUP_SERVER{$fileserver}{$hostname};
return 1;
}
sub add_job {
my ($job_name,$path)=@_; # 'home_kuss','pille:/amd/pille/1/home/abt_mdl/kuss'
my $server;
($server,$path)=$path=~/^([^:]+):(\S+)$/;
i_want_this_fileserver($server,$job_name) or return;
my ($path_id) = $dbh->selectrow_array('SELECT path_id FROM path WHERE path_server=? AND path_path=?',undef,$server,$path);
unless (defined $path_id) {
$dbh->do('INSERT INTO path(path_server,path_path) VALUES (?,?)',undef,$server,$path);
($path_id)=$dbh->selectrow_array('SELECT last_insert_rowid()');
# print "added path $server:$path\n";
}
my ($job_id,$job_enabled) = $dbh->selectrow_array('SELECT job_id,job_enabled FROM job WHERE job_name=? AND job_path_id=?',undef,$job_name,$path_id);
unless (defined $job_id) {
$dbh->do('INSERT INTO job(job_name,job_path_id) VALUES (?,?)',undef,$job_name,$path_id);
($job_id)=$dbh->selectrow_array('SELECT last_insert_rowid()');
# print "added job $job_name\n";
} else {
$job_enabled or $dbh->do('UPDATE job SET job_enabled=1 WHERE job_id=?',undef,$job_id);
}
# print "job_id $job_id path_id $path_id\n";
$dbh->do('INSERT INTO vrfy (job_id) VALUES (?)',undef,$job_id);
}
#################################### netgroup
sub expand_netgroup_hosts
{
my (@netgroups)=@_;
my %DID=();
my @out=();
while (@netgroups)
{
my $ng=pop @netgroups;
for my $entry (split ' ',`ypmatch $ng netgroup`)
{
if ($entry=~/^\((\S+),.*,.*\)$/)
{
push @out,$1;
}
elsif ($entry=~/^[a-z0-9_]+$/i)
{
push @netgroups,$entry;
}
else
{
warn "ignored entry $entry\n";
}
}
}
return @out;
}
#################################### amd map import
our @PATTERN_EXCLUDES;
sub excluded {
my ($path)=@_;
for my $re (@PATTERN_EXCLUDES) {
$path=~/$re/ and return 1;
}
return 0;
}
sub read_extra_jobs() {
open IN,'<',"${JOBSCFGFILE}" or die "${JOBSCFGFILE}: $!\n";
while (<IN>) {
chomp;
s/#.*//;
s/\s+$//;
/\S/ or next;
my ($name, $path) = m/^(\S+)\s+(.*)$/;
$name and $path or next;
excluded($path) and next;
add_job($name, $path);
}
close IN;
}
sub read_excludes() {
@PATTERN_EXCLUDES=();
open IN,'<',"${EXCLUDESFILE}" or die "${EXCLUDESFILE}: $!\n";
while (<IN>) {
chomp;
s/#.*//;
s/\s+$//;
/\S/ or next;
eval {
push @PATTERN_EXCLUDES,qr/^$_$/;
};
$@ and warn "excludes line $. : $@";
}
close IN;
}
###########################################################################
sub datadir {
my ($job_name)=@_;
my ($volume_path) = $dbh->selectrow_array('SELECT volume_path FROM volume,stat,job WHERE stat_volume_id=volume_id AND stat_job_id=job_id AND job_name=? ORDER BY stat_started DESC',undef,$job_name);
return $volume_path||free_datadir();
}
sub do_a_job {
df_cached(); # clear df cache
$dbh->begin_work();
my $sth=$dbh->prepare('
SELECT job_id,job_name,job_ok,job_started,path_server,path_path,
CASE WHEN job_ok THEN job_started-? ELSE JOB_STARTED-? END as due
FROM job,path
WHERE job_path_id=path_id
AND job_enabled
AND due<=0
ORDER BY due
');
$sth->execute(time-$deltatimeok,time-$deltatimefail);
my ($job_id,$job_name,$job_ok,$job_started,$path_server,$path_path,$due);
my $datadir;
my $volume_id;
my $started=time;
my $linkfrom_stat_rowid;
while (1) {
my $row=$sth->fetchrow_arrayref;
unless (defined $row) {
$dbh->commit;
return 0;
}
($job_id,$job_name,$job_ok,$job_started,$path_server,$path_path,$due)=@$row;
next unless lck_can_lock("SERVER.$path_server",LCK_EX);
next unless lck_can_lock("JOB.$job_name",LCK_EX);
$datadir=datadir($job_name); # $ROOT/data/X1234
-d $datadir or next;
my $free=df_cached($datadir)/1024/1024/1024;
if ($free<2) {
next;
}
($volume_id) = $dbh->selectrow_array('SELECT volume_id FROM volume WHERE volume_path=?',undef,$datadir);
unless (defined $volume_id) {
$dbh->do('INSERT INTO volume(volume_path) VALUES (?)',undef,$datadir);
($volume_id)=$dbh->selectrow_array('SELECT last_insert_rowid()');
}
$linkfrom_stat_rowid = find_linkfrom($volume_id,$job_id,$job_name,$started);
if (defined $linkfrom_stat_rowid) {
lck_can_lock("RUN.$linkfrom_stat_rowid",LCK_EX) or next;
}
defined $linkfrom_stat_rowid and lck_lock("RUN.$linkfrom_stat_rowid",LCK_EX);
lck_lock("SERVER.$path_server",LCK_EX);
lck_lock("JOB.$job_name",LCK_EX);
last;
}
$sth->finish;
$dbh->do('UPDATE job SET job_started=? WHERE job_id=?',undef,$started,$job_id);
upid_doing(sprintf("BACKUP %-15s %-13s %s:%s -> %s",$job_name,fstag($path_path),$path_server,$path_path,$datadir));
$dbh->commit;
my $jobdir="$datadir/$job_name"; # $ROOT/data/X1234/home_buczek
-d $jobdir or mkdir $jobdir or die "$jobdir: $1\n";
my $fstag=fstag($path_path);
my $dst=sprintf('%s:%s:%s',$job_name,timetag($started),$fstag); # home_buczek:201211041322:pille_1
my $tmp=sprintf('%s:%s:%s',$job_name,'INCOMPLETE',$fstag); # home_buczek:INCOMPLETE:pille_1
my ($linkfrom_job_name,$linkfrom_stat_started,$linkfrom_path_path);
my ($linkfrom_run_name);
if (defined $linkfrom_stat_rowid) {
($linkfrom_job_name,$linkfrom_stat_started,$linkfrom_path_path)=$dbh->selectrow_array(<<'EOF',undef,$linkfrom_stat_rowid);
SELECT job_name,stat_started,path_path
FROM stat,job,path
WHERE stat_job_id=job_id
AND job_path_id=path_id
AND stat.rowid=?
EOF
$linkfrom_run_name=run_name($linkfrom_job_name,$linkfrom_stat_started,$linkfrom_path_path);
}
openlog ($job_name);
my $src="$path_server:$path_path";
my $START_MSG=scalar(localtime).": $job_name : $src -> $dst".($linkfrom_stat_rowid?" (BASE:$linkfrom_job_name/$linkfrom_run_name)":'').": started\n";
# warn $START_MSG;
print LOG $START_MSG;
my $pid=fork;
defined $pid or die "$0: $!\n";
unless ($pid) {
open STDIN,'<','/dev/null';
open STDOUT,'>&LOG';
open STDERR,'>&LOG';
chdir $jobdir or die "$jobdir: $!\n";
exec 'rsync',
'-aHx', # -a : --archive: -recursive --links --perms --times --group --owner --devices --specials
# -H :-hard-links
# -x : -one-file-system
'--delete',
'--numeric-ids',
'--bwlimit','30000',
# '--progress',
'--stats',
'--filter',". ${RSYNCFILTERFILE}",
# '--log-file',"$ROOT/test.log",
# '-v',
($linkfrom_stat_rowid?('--link-dest',"../../$linkfrom_job_name/$linkfrom_run_name"):()), # relative to destination dir
'-e',"ssh -oFallBackToRsh=no -oStrictHostKeyChecking=no -oBatchMode=yes -i ${SSHPRIVATEKEY}",
"$src/.","./$tmp";
die "$0: $!\n";
}
waitpid($pid,0);
$?=0 if $?==24<<8; # exit status 24 : some files vanished
my $ok= $? ? 0 : 1;
if ($?) {
print LOG scalar(localtime).": $job_name : $src -> $dst".($linkfrom_stat_rowid?" (BASE:$linkfrom_job_name/$linkfrom_run_name)":'').": finished with error ($?)\n";
} else {
print LOG scalar(localtime).": $job_name : $src -> $dst".($linkfrom_stat_rowid?" (BASE:$linkfrom_job_name/$linkfrom_run_name)":'').": finished ok\n";
rename "$jobdir/$tmp","$jobdir/$dst" or die "$jobdir/$dst: $!\n";
unlink("$jobdir/last");
symlink $dst,"$jobdir/last" or die "$jobdir/last: $!\n";
eval { # silenty ignore errors
my ($files,$transferred_files,$size,$transferred_size,$unmatched,$matched)=scan_rsync_log(logfile($job_name));
my ($stat_du_bytes)=du_bytes("$jobdir/$dst");
print LOG scalar(localtime).": du done\n";
my $seconds=time-$started;
my $type=$linkfrom_job_name?'I':'F';
#print "$job_name: $files files $transferred_files transferred $size bytes $transferred_size transferred $seconds seconds\n";
$dbh->do('INSERT INTO stat (stat_job_id,stat_started,stat_files,stat_files_transferred,stat_bytes,stat_bytes_transferred,stat_du_bytes,stat_seconds,stat_type,stat_volume_id) VALUES (?,?,?,?,?,?,?,?,?,?)',undef,$job_id,$started,$files,$transferred_files,$size,$transferred_size,$stat_du_bytes,$seconds,$type,$volume_id);
};
}
close LOG;
$dbh->begin_work;
$dbh->do('UPDATE job SET job_ok=? WHERE job_id=?',undef,($ok?1:0),$job_id);
lck_unlock("SERVER.$path_server",LCK_EX);
lck_unlock("JOB.$job_name",LCK_EX);
defined $linkfrom_stat_rowid and lck_unlock("RUN.$linkfrom_stat_rowid",LCK_EX);
$dbh->commit;
return 1;
}
sub amd_scan {
my ($force_now)=@_;
$dbh->begin_work;
if (!$force_now) {
my ($g_last_amd_scan)=$dbh->selectrow_array('SELECT g_last_amd_scan FROM global');
if ($g_last_amd_scan+60*60 >= time) {
$dbh->commit;
return;
}
}
$dbh->do('CREATE TEMPORARY TABLE IF NOT EXISTS vrfy (job_id INTEGER)');
$dbh->do('DELETE FROM vrfy');
read_excludes();
read_server();
read_extra_jobs();
$dbh->do('UPDATE job SET job_enabled=0 WHERE job_id NOT IN vrfy');
$dbh->do('DROP TABLE vrfy');
$dbh->do('UPDATE global SET g_last_amd_scan=?',undef,time);
$dbh->commit;
}
sub do_jobs {
while (1) {
while (1) {
amd_scan(0);
upid_shutdown_pending() and exit 0;
do_a_job() or last;
upid_shutdown_pending() and exit 0;
upid_doing('BACKUP pause');
sleep 1;
upid_shutdown_pending() and exit 0;
}
upid_shutdown_pending() and exit 0;
upid_doing('BACKUP idle, waiting');
sleep 10;
upid_shutdown_pending() and exit 0;
}
}
sub purge_zombies {
upid_purge();
$dbh->do('DELETE FROM lock WHERE lck_pid NOT IN (SELECT upid_pid from upid)');
return;
}
sub migrate_one {
df_cached();
my $datadir_src="$DATADIR/C3019";
my @dd=sort {df_cached($a) <=> df_cached($b)} datadirs_for_balance();
@dd>=2 or return 0;
my $datadir_dst=$dd[$#dd];
if (df_cached($datadir_dst)<5*1024*1024*1024) {
return 0;
}
my ($job_name,$weight)=$dbh->selectrow_array(<<'EOF',undef,$datadir_src);
SELECT job_name,MAX(stat_bytes)+sum(stat_bytes_transferred) AS weight
FROM job,stat,volume
WHERE stat_job_id=job_id AND stat_volume_id=volume_id
AND volume_path=?
GROUP BY job_name
ORDER BY weight DESC
LIMIT 1
EOF
defined $job_name or return 0;
my ($to) = $datadir_dst =~ m(^${DATADIR}/([^/]+)$);
defined $to or die;
return move_new($job_name,$to,'MIGRATE');
}
sub balance_one {
df_cached();
my @dd=sort {df_cached($a) <=> df_cached($b)} datadirs_for_balance();
#for (@dd) {
# print "$_ : ",human(df_cached($_)),"\n";
#}
@dd>=2 or return 0;
my $datadir_minfree=$dd[0];
my $datadir_maxfree=$dd[$#dd];
#print "datadir minfree: $datadir_minfree \n";
unless (df_cached($datadir_minfree)<5*1024*1024*1024) {
# warn "no need to balance: minimum free space over 5 TB\n";
return 0;
}
#if (df_cached($datadir_maxfree)<10*1024*1024*1024) {
if (df_cached($datadir_maxfree)<8*1024*1024*1024) {
# warn "no dataspace over 10 TB free\n";
# warn "no dataspace over 8 TB free\n";
return 0;
}
my ($job_name,$weight)=$dbh->selectrow_array(<<'EOF',undef,$datadir_minfree);
SELECT job_name,MAX(stat_bytes)+sum(stat_bytes_transferred) AS weight
FROM job,stat,volume
WHERE stat_job_id=job_id AND stat_volume_id=volume_id
AND volume_path=?
GROUP BY job_name
ORDER BY weight ASC
LIMIT 1
EOF
defined $job_name or return 0;
#print "move $job_name from $datadir_minfree to $datadir_maxfree weight ",human($weight),"\n";
my ($to) = $datadir_maxfree =~ m(^${DATADIR}/([^/]+)$);
defined $to or die;
return move_new($job_name,$to,'BALANCE');
}
sub run_name {
my ($job_name,$started,$path)=@_;
my $timetag=timetag($started);
my $fstag=fstag($path);
return "$job_name:$timetag:$fstag";
}
sub parse_run_name {
my ($run_name)=@_;
my ($job_name,$timetag,$fstag) = $run_name=~/^([^:]+):([^:]+):([^:]*)$/ or return undef;
return ($job_name,$timetag,$fstag);
}
sub rsync_copy {
my ($src_path,$dst_path,$link_path)=@_;
my $pid=fork;
defined $pid or die "$0: $!\n";
unless ($pid) {
my @cmd=(
'rsync',
'-aHx', # -a : --archive: -recursive --links --perms --times --group --owner --devices --specials
# -H :-hard-links
# -x : -one-file-system
'--delete',
'--numeric-ids',
($link_path?('--link-dest',$link_path):()),
$src_path,$dst_path
);
# warn "DEBUG: ",join(' ',@cmd),"\n";
exec @cmd;
die "$0: $!\n";
}
waitpid($pid,0);
$? and exit 1;
}
sub distmaster {
my ($hostname)=@_;
return 'deinemuddah';
}
sub find_linkfrom {
my ($volume_id,$job_id,$job_name,$stat_started)=@_;
my $linkfrom_stat_rowid;
# 1 : same job_id
($linkfrom_stat_rowid)= $dbh->selectrow_array(<<'EOF',undef,$job_id,$volume_id,$stat_started);
SELECT stat.rowid FROM stat
WHERE stat_job_id=?
AND stat_volume_id=?
ORDER BY ABS(stat_started-?)
EOF
return $linkfrom_stat_rowid if defined $linkfrom_stat_rowid;
# 2 : same job_name
($linkfrom_stat_rowid)= $dbh->selectrow_array(<<'EOF',undef,$job_name,$volume_id,$stat_started);
SELECT stat.rowid FROM stat,job
WHERE stat_job_id=job_id
AND job_name=?
AND stat_volume_id=?
ORDER BY ABS(stat_started-?)
EOF
return $linkfrom_stat_rowid if defined $linkfrom_stat_rowid;
return undef; # when distmaster() ist working....
# 3 : try same sys_ or usr_ family
$job_name =~ /^(sys|usr)_(.+)/ or return undef;
my ($prefix,$hostname)=($1,$2);
my $distmaster=distmaster($hostname);
my $sth=$dbh->prepare(<<'EOF');
SELECT stat.rowid,job_name FROM stat,job
WHERE
stat_job_id=job_id
AND job_name GLOB ?
AND stat_volume_id=?
ORDER BY ABS(stat_started-?)
EOF
$sth->execute($prefix.'_*',$volume_id,$stat_started);
while (my $row=$sth->fetchrow_arrayref) {
my $linkfrom_job_name;
($linkfrom_stat_rowid,$linkfrom_job_name)=@$row;
$linkfrom_job_name =~ /^(sys|usr)_(.+)/ or die;
my ($linkfrom_prefix,$linkfrom_hostname)=($1,$2);
my $linkfrom_distmaster=distmaster($linkfrom_hostname);
if ($linkfrom_distmaster eq $distmaster) {
$sth->finish();
return $linkfrom_stat_rowid;
}
}
return undef;
}
sub cmd_disable {
@ARGV==1 or die USAGE;
my ($job_name)=@ARGV;
$dbh->do('UPDATE job SET job_enabled=0 WHERE job_name=?',undef,$job_name);
}
sub cmd_test {
my $bytes=du_bytes($_[0]);
print "$_[0] : $bytes bytes\n";
}
sub move_new {
my ($job_name,$to, $status_tag) = @_; # 'prj_AGHucho' , [ 'C4123' ] , 'BALANCE'
$status_tag||='BALANCE';
my $dst_volume_path;
if ($to) {
$dst_volume_path="$DATADIR/$to";
} else {
df_cached(); # clear df cache
$dst_volume_path=free_datadir(); # $ROOT/data/C4321
($to)=$dst_volume_path=~m"([^/]*)$";
}
-d $dst_volume_path or die "$dst_volume_path: $!\n";
my ($dst_volume_id) = $dbh->selectrow_array('SELECT volume_id FROM volume WHERE volume_path=?',undef,$dst_volume_path);
unless (defined $dst_volume_id) {
$dbh->do('INSERT INTO volume(volume_path) VALUES (?)',undef,$dst_volume_path);
($dst_volume_id)=$dbh->selectrow_array('SELECT last_insert_rowid()');
}
while (1) {
my $free=df($dst_volume_path)/1024/1024/1024;
if ($free<2) {
#die "$dst_volume_path: only $free TB free, 2 required.\n";
return 0;
}
$dbh->begin_work;
#
# newest run not on target volume
#
my ($stat_rowid,$src_volume_id,$src_volume_path,$stat_started,$path_path,$job_id) = $dbh->selectrow_array(<<'EOF',undef,$job_name,$dst_volume_id);
SELECT stat.rowid,volume_id,volume_path,stat_started,path_path,job_id
FROM stat,volume,job,path
WHERE
job_name=? AND volume_id <> ?
AND stat_job_id=job_id AND job_path_id=path_id AND stat_volume_id=volume_id
ORDER BY stat_started DESC
EOF
unless (defined $stat_rowid) {
$dbh->commit;
last;
}
unless (lck_can_lock("RUN.$stat_rowid",LCK_EX)) {
$dbh->commit;
sleep 10;
redo;
}
my ($from)=$src_volume_path=~m"([^/]*)$"; # 'C1234'
my $run_name=run_name($job_name,$stat_started,$path_path);
#
# run of same job on target volume, nearest date
# may be void
#
my $linkfrom_stat_rowid = find_linkfrom($dst_volume_id,$job_id,$job_name,$stat_started);
if (defined $linkfrom_stat_rowid) {
unless (lck_can_lock("RUN.$linkfrom_stat_rowid",LCK_EX)) {
$dbh->commit;
sleep 10;
redo;
}
my ($linkfrom_job_name,$linkfrom_stat_started,$linkfrom_path_path)=$dbh->selectrow_array(<<'EOF',undef,$linkfrom_stat_rowid);
SELECT job_name,stat_started,path_path
FROM stat,job,path
WHERE stat_job_id=job_id
AND job_path_id=path_id
AND stat.rowid=?
EOF
my $linkfrom_run_name=run_name($linkfrom_job_name,$linkfrom_stat_started,$linkfrom_path_path);
warn scalar(localtime)," $status_tag : $from $run_name -> $to $run_name (link $linkfrom_run_name)\n";
upid_doing("$status_tag $from $run_name -> $to $run_name (link $linkfrom_run_name)");
lck_lock("RUN.$stat_rowid",LCK_EX);
lck_lock("RUN.$linkfrom_stat_rowid",LCK_EX);
$dbh->commit();
rsync_copy("$src_volume_path/$job_name/$run_name/","$dst_volume_path/$job_name/$run_name","$dst_volume_path/$linkfrom_job_name/$linkfrom_run_name");
rename("$src_volume_path/$job_name/$run_name","$src_volume_path/$job_name/$run_name.BEING_DELETED") or
die "$src_volume_path/$job_name/$run_name $src_volume_path/$job_name/$run_name.BEING_DELETED : $!\n";
$dbh->do('UPDATE stat SET stat_volume_id=? WHERE rowid=?',undef,$dst_volume_id,$stat_rowid);
lck_unlock("RUN.$stat_rowid",LCK_EX);
lck_unlock("RUN.$linkfrom_stat_rowid",LCK_EX);
} else {
unless (lck_can_lock("JOB.$job_name",LCK_EX)) {
$dbh->commit;
sleep 10;
redo;
}
warn scalar(localtime)," $status_tag : $from $run_name -> $to $run_name\n";
upid_doing("$status_tag $from $run_name -> $to $run_name");
lck_lock("JOB.$job_name",LCK_EX);
$dbh->commit();
-d "$dst_volume_path/$job_name" or mkdir "$dst_volume_path/$job_name" or die "$dst_volume_path/$job_name: $!\n";
-e "$dst_volume_path/$job_name/$run_name" and die "$dst_volume_path/$job_name/$run_name: exists\n";
system "cp -a $src_volume_path/$job_name/$run_name $dst_volume_path/$job_name/$run_name" and exit 1;
rename("$src_volume_path/$job_name/$run_name","$src_volume_path/$job_name/$run_name.BEING_DELETED") or
die "$src_volume_path/$job_name/$run_name $src_volume_path/$job_name/$run_name.BEING_DELETED : $!\n";
$dbh->do('UPDATE stat SET stat_volume_id=? WHERE rowid=?',undef,$dst_volume_id,$stat_rowid);
lck_unlock("JOB.$job_name",LCK_EX);
}
upid_doing("$status_tag rm -rf $src_volume_path/$job_name/$run_name.BEING_DELETED");
warn scalar(localtime)," $status_tag : rm -rf $src_volume_path/$job_name/$run_name.BEING_DELETED\n";
system "rm -rf $src_volume_path/$job_name/$run_name.BEING_DELETED" and exit 1;
rmdir "$src_volume_path/$job_name"; # (if empty)
}
warn scalar(localtime)." $status_tag :(done)\n";
return 1;
}
sub shortDate {
my ($time)=@_;
my $string=localtime($time); # Sat Nov 3 09:33:14 2012
return substr($string,4,12);
}
sub humanSeconds {
my ($seconds)=@_;
$seconds<60 and return sprintf ('%2d s',$seconds);
$seconds<3600 and return sprintf ('%2d m %2d s',$seconds/60,$seconds%60);
$seconds<86400 and return sprintf ('%2d h %2d m',$seconds/3600,$seconds%3600/60);
return sprintf('%2d d %2d h',$seconds/86400,$seconds%86400/3600);
#
# now
# 12 s
# 5 m 65 s
# 12 h 59 m
# 5 d 3.5 h
}
sub humanDue {
my ($due)=@_;
$due==0 and return 'DUE now';
$due>0 and return 'due in '.humanSeconds($due);
$due < -1359900000 and return 'DUE since ever';
$due<0 and return 'DUE since '.humanSeconds(-$due);
}
sub cmd_status {
$dbh->begin_work;
my $sth;
$sth=$dbh->prepare('SELECT upid_pid,upid_text,upid_since FROM upid');
$sth->execute;
print "processes\n";
while (my $row=$sth->fetchrow_arrayref) {
my ($pid,$what,$upid_since)=@$row;
my $since=$upid_since ? humanSeconds(time-$upid_since) : '-';
printf(" %-9s : %9s : %s\n",$pid,$since,$what);
}
print "next 30 due\n";
$sth=$dbh->prepare('
SELECT job_id,job_name,job_ok,job_started,path_server,path_path,
CASE WHEN job_ok THEN job_started-? ELSE JOB_STARTED-? END as due
FROM job,path
WHERE job_path_id=path_id
AND job_enabled
ORDER BY due
limit 30
');
$sth->execute(time-$deltatimeok,time-$deltatimefail);
while (my $row=$sth->fetchrow_arrayref) {
my ($job_id,$job_name,$job_ok,$job_started,$path_server,$path_path,$due)=@$row;
my $lpid=lck_get_pid("JOB.$job_name");
my $state=$lpid?"PID $lpid":($job_ok?'ok':$job_started?'fail':'-');
my $time=$job_started?shortDate($job_started):'never';
my $src=$path_server.':'.$path_path;
printf(" %-19s %-9s %-12s %-25s %-15s %s\n",humanDue($due),$state,$time,$job_name,fstag($path_path),$src);
}
$dbh->commit;
}
sub cmd_jobs {
$dbh->begin_work;
my $sth=$dbh->prepare('
SELECT job_id,job_name,job_ok,job_started,job_enabled,path_server,path_path,
CASE WHEN job_ok THEN job_started-? ELSE JOB_STARTED-? END as due
FROM job,path
WHERE job_path_id=path_id
ORDER BY due
');
$sth->execute(time-$deltatimeok,time-$deltatimefail);
while (my $row=$sth->fetchrow_arrayref) {
my ($job_id,$job_name,$job_ok,$job_started,$job_enabled,$path_server,$path_path,$due)=@$row;
my $lpid=lck_get_pid("JOB.$job_name");
my $state=$lpid?"PID $lpid":($job_ok?'ok':$job_started?'fail':'-');
my $time=$job_started?shortDate($job_started):'never';
my $src=$path_server.':'.$path_path;
printf("%19s %-9s %12s %-28s %-15s %s\n",($job_enabled?(humanDue($due)):'disabled'),$state,$time,$job_name,fstag($path_path),$src);
}
$dbh->commit;
}
sub human {
my ($n)=@_;
$n<1024 and return sprintf("%6d byte",$n);
$n/=1024;
$n<1024 and return sprintf("%6.1f KiB",$n);
$n/=1024;
$n<1024 and return sprintf("%6.1f MiB",$n);
$n/=1024;
$n<1024 and return sprintf("%6.1f GiB",$n);
$n/=1024;
return sprintf("%6.1f TiB",$n);
}
sub show_stat_table {
my ($sql,@bindings)=@_;
my $sth=$dbh->prepare($sql);
$sth->execute(@bindings);
while (my $row=$sth->fetchrow_arrayref) {
my ($job_name,$stat_started,$stat_bytes,$stat_seconds,$stat_bytes_transferred,$path_server,$path_path)=@$row;
printf " %-21s %s %11s (%11s transferred in %9s) %s:%s\n",$job_name,shortDate($stat_started),human($stat_bytes),human($stat_bytes_transferred),humanSeconds($stat_seconds),$path_server,$path_path;
}
}
our $COUNT_SQL=<<'EOF';
SELECT
stat_job_id AS count_job_id,
count(stat_job_id) as count_count
FROM stat
GROUP BY stat_job_id
EOF
sub cmd_stat {
my $last_stat="SELECT stat_job_id AS last_stat_job_id,MAX(stat_started) AS last_stat_started FROM stat GROUP BY stat_job_id";
my $last_inc_stat="SELECT stat_job_id AS last_stat_job_id,MAX(stat_started) AS last_stat_started FROM stat WHERE stat_type='I' GROUP BY stat_job_id";
my $join_last="stat_job_id=last_stat_job_id AND stat_started=last_stat_started";
my $cols="job_name,stat_started,stat_bytes,stat_seconds,stat_bytes_transferred,path_server,path_path";
my $tabs="job,stat,path";
my $join="stat_job_id=job_id AND job_path_id=path_id";
my $active="job_enabled";
$dbh->begin_work;
my $sth;
if (@ARGV==0) {
print "TOP 10 source size (all), last run\n";
show_stat_table("SELECT $cols FROM $tabs,($last_stat) WHERE $active AND $join AND $join_last ORDER BY stat_bytes DESC LIMIT 10");
print "TOP 10 elapsed (Incrementals) , last run\n";
show_stat_table("SELECT $cols FROM $tabs,($last_inc_stat) WHERE $active AND $join AND $join_last ORDER BY stat_seconds DESC LIMIT 10");
print "TOP 10 transferred (Incrementals) , last run \n";
show_stat_table("SELECT $cols FROM $tabs,($last_inc_stat) WHERE $active AND $join AND $join_last ORDER BY stat_bytes_transferred DESC LIMIT 10");
} elsif (@ARGV==1 && $ARGV[0] eq 'expire') {
print "OLDEST 20\n";
show_stat_table("SELECT $cols FROM $tabs,($COUNT_SQL) WHERE $join AND stat_job_id=count_job_id AND count_count>1 ORDER BY stat_started LIMIT 20");
my $keep=time-$DAYS*24*60*60;
print "\n";
print "(expire time : ".shortDate($keep).")\n";
} elsif (@ARGV==1 && $ARGV[0] eq 'zombies') {
my $keep=time-$DAYS*24*60*60;
print "zombies\n";
show_stat_table("SELECT $cols FROM $tabs,($COUNT_SQL) WHERE $join AND stat_job_id=count_job_id AND count_count==1 AND stat_started<? ORDER BY stat_bytes DESC",$keep);
print "\n";
print "(expire time : ".shortDate($keep).")\n";
} elsif (@ARGV==1 && $ARGV[0] eq 'weight') {
print "top 20 weight jobs ( max trans weight ):\n";
$sth=$dbh->prepare(<<'EOF');
SELECT job_name,path_path,MAX(stat_bytes),SUM(stat_bytes_transferred),MAX(stat_bytes)+SUM(stat_bytes_transferred) AS weight,job_enabled
FROM job,stat,path
WHERE stat_job_id=job_id
AND job_path_id=path_id
GROUP BY job_name
ORDER BY weight DESC
LIMIT 20
EOF
$sth->execute();
while (my $row=$sth->fetchrow_arrayref) {
my ($job_name,$path_path,$max_bytes,$sum_transferred,$weight,$job_enabled)=@$row;
printf "%-25s %20s %11s %11s %11s %s\n",$job_name,fstag($path_path),human($max_bytes),human($sum_transferred),human($weight),$job_enabled?'':'(disabled)';
}
print "\n";
print "top 20 weight disabled jobs ( max trans weight ) :\n";
$sth=$dbh->prepare(<<'EOF');
SELECT job_name,path_path,MAX(stat_bytes),SUM(stat_bytes_transferred),MAX(stat_bytes)+SUM(stat_bytes_transferred) AS weight,job_enabled
FROM job,stat,path
WHERE stat_job_id=job_id
AND job_path_id=path_id
AND job_enabled=0
GROUP BY job_name
ORDER BY weight DESC
LIMIT 20
EOF
$sth->execute();
while (my $row=$sth->fetchrow_arrayref) {
my ($job_name,$path_path,$max_bytes,$sum_transferred,$weight,$job_enabled)=@$row;
printf "%-25s %20s %11s %11s %11s %s\n",$job_name,fstag($path_path),human($max_bytes),human($sum_transferred),human($weight),$job_enabled?'':'(disabled)';
}
} elsif (@ARGV==1 && $ARGV[0] eq 'week') {
$sth=$dbh->prepare(<<"EOF");
SELECT job_name,path_path,SUM(stat_seconds),SUM(stat_bytes_transferred) AS s,job_enabled
FROM $tabs
WHERE $join AND stat_started>?
GROUP BY job_id
ORDER BY s DESC
LIMIT 20
EOF
$ sth->execute(time-7*24*60*60);
while (my $row=$sth->fetchrow_arrayref) {
my ($job_name,$path_path,$sum_stat_seconds,$sum_stat_bytes_transferred,$job_enabled)=@$row;
printf "%-25s %-20s %10s %11s %s\n",$job_name,fstag($path_path),humanSeconds($sum_stat_seconds),human($sum_stat_bytes_transferred),$job_enabled?'':'(disabled)';
}
} elsif (@ARGV==1 && $ARGV[0] eq 'oversize') {
my $cnt=0;
$sth=$dbh->prepare(<<"EOF");
SELECT job_name,stat_bytes,stat_du_bytes,path_path
FROM $tabs,($last_stat)
WHERE $join AND stat_bytes>? AND job_enabled=1 AND $join_last
ORDER BY stat_bytes DESC
EOF
$sth->execute(1024*1024*1024*1024);
while (my $row=$sth->fetchrow_arrayref) {
$cnt==0 and print "The following backup jobs have a source size over 1 TB:\n\n";
my ($job_name,$stat_bytes,$stat_du_bytes,$path_path)=@$row;
printf "%-20s %-60s : %11s (du: %11s)\n",$job_name,$path_path,human($stat_bytes),human($stat_du_bytes||0);
$cnt++;
}
$cnt and exit 1;
print "No jobs have a source size over 1 TB:\n\n";
} elsif (@ARGV==1 && $ARGV[0] eq 'dusize') {
my $cnt=0;
$sth=$dbh->prepare(<<"EOF");
SELECT job_name,stat_bytes,stat_du_bytes,path_path
FROM $tabs,($last_stat)
WHERE $join AND job_enabled=1 AND $join_last
ORDER BY stat_bytes DESC
EOF
$sth->execute();
while (my $row=$sth->fetchrow_arrayref) {
$cnt==0 and print "rsync size vs. du size:\n\n";
my ($job_name,$stat_bytes,$stat_du_bytes,$path_path)=@$row;
defined $stat_du_bytes or next;
printf "%-15d %-30s : RSYNC: %11s DU: %11s\n",$stat_du_bytes-$stat_bytes,$job_name,human($stat_bytes),human($stat_du_bytes||0);
$cnt++;
}
}
else {
die USAGE;
}
$dbh->commit;
}
sub cmd_job {
@ARGV==1 or die USAGE;
my ($job_name)=@_;
$dbh->begin_work;
my $sth;
$sth=$dbh->prepare('
SELECT job_id,job_name,job_ok,job_started,job_enabled,path_server,path_path,
CASE WHEN job_ok THEN job_started-? ELSE JOB_STARTED-? END as due
FROM job,path
WHERE job_path_id=path_id
AND job_name=?
ORDER BY due
');
$sth->execute(time-$deltatimeok,time-$deltatimefail,$job_name);
while (my $row=$sth->fetchrow_arrayref) {
my ($job_id,$job_name,$job_ok,$job_started,$job_enabled,$path_server,$path_path,$due)=@$row;
my $lpid=lck_get_pid("JOB.$job_name")||'';
my $state=$lpid?"PID $lpid":($job_ok?'ok':$job_started?'fail':'-');
my $time=$job_started?shortDate($job_started):'never';
my $src=$path_server.':'.$path_path;
printf("%19s %-9s %24s %-28s %-15s %s %s\n",($job_enabled?(humanDue($due)):'disabled'),$state,$time,$job_name,fstag($path_path),$src,$lpid);
}
print "\n";
$sth=$dbh->prepare('select stat_type,job_name,stat_started,stat_bytes,stat_du_bytes,stat_seconds,stat_bytes_transferred,path_server,path_path from job,stat,path where stat_job_id=job_id and job_path_id=path_id and job_name=? order by stat_started');
$sth->execute($job_name);
while (my $row=$sth->fetchrow_arrayref) {
my ($stat_type,$job_name,$stat_started,$stat_bytes,$stat_du_bytes,$stat_seconds,$stat_bytes_transferred,$path_server,$path_path)=@$row;
printf " %s %-21s %s %11s %11s (%11s transferred in %11s) %s:%s\n",$stat_type,$job_name,shortDate($stat_started),human($stat_bytes),human($stat_du_bytes||0),human($stat_bytes_transferred),humanSeconds($stat_seconds),$path_server,$path_path;
}
$dbh->commit;
}
sub cmd_fix_stat {
upid_register_self();
my $sth=$dbh->prepare('SELECT stat.rowid,stat_started,job_name,volume_path,path_path FROM stat,job,volume,path WHERE stat_du_bytes IS NULL AND stat_volume_id=volume_id AND stat_job_id=job_id AND job_path_id=path_id ORDER BY stat.rowid DESC');
SCAN: while (1) {
upid_doing("du scan");
$dbh->begin_work();
$sth->execute();
my $retry=0;
while (my $row=$sth->fetchrow_arrayref) {
my ($stat_rowid,$stat_started,$job_name,$volume_path,$path_path)=@$row;
my $dir=sprintf '%s/%s/%s:%s:%s',$volume_path,$job_name,$job_name,timetag($stat_started),fstag($path_path);
printf "%5d %s %-30s %s %s\n",$stat_rowid,shortDate($stat_started),$job_name,$volume_path,$dir;
unless (lck_can_lock("JOB.$job_name",LCK_EX)) {
print " *** LOCKED ***\n";
$retry++;
next;
}
unless (-d $dir) {
print " *** missing : $dir\n";
next;
}
lck_lock("JOB.$job_name",LCK_EX);
upid_doing("du $dir");
$sth->finish();
$dbh->commit();
my ($stat_du_bytes)=du_bytes("$dir");
$dbh->do('UPDATE stat SET stat_du_bytes=? WHERE rowid=?',undef,$stat_du_bytes,$stat_rowid);
print " ... ",human($stat_du_bytes),"\n";
lck_unlock("JOB.$job_name",LCK_EX);
redo SCAN;
}
$dbh->commit();
last unless $retry;
}
upid_deregister_self();
}
sub expire_one {
$dbh->begin_work();
my $keep=time-$DAYS*24*60*60;
my $sth=$dbh->prepare(<<"EOF");
SELECT stat.rowid,job_name,stat_started,path_path,volume_path
FROM stat,job,path,volume,($COUNT_SQL)
WHERE stat_volume_id=volume_id AND job_path_id=path_id AND stat_job_id=job_id AND job_id=count_job_id
AND stat_started<?
AND count_count>1
ORDER BY stat_started
EOF
$sth->execute($keep);
my ($stat_rowid,$job_name,$stat_started,$path_path,$volume_path);
my $locked;
while (my $row=$sth->fetchrow_arrayref) {
($stat_rowid,$job_name,$stat_started,$path_path,$volume_path)=@$row;
$locked = 0;
lck_can_lock("JOB.$job_name",LCK_EX) or next;
lck_can_lock("EXPIRE.$volume_path",LCK_EX) or next;
lck_can_lock("RUN.$stat_rowid",LCK_EX) or next;
lck_lock("JOB.$job_name",LCK_EX) or next;
lck_lock("EXPIRE.$volume_path",LCK_EX) or next;
lck_lock("RUN.$stat_rowid",LCK_EX) or next;
$locked = 1;
my $timetag=timetag($stat_started);
my $fstag=fstag($path_path);
my $dir="$volume_path/$job_name/$job_name:$timetag:$fstag";
-d $dir or next;
#print scalar(localtime($stat_started))," : $dir\n";
#next;
$sth->finish();
system "mv $dir $dir.EXPIRED" and exit 1;
$dbh->do('DELETE FROM stat WHERE rowid=?',undef,$stat_rowid);
upid_doing("EXPIRE rm -rf $dir.EXPIRED");
$dbh->commit();
system "rm -rf $dir.EXPIRED" and exit 1;
lck_unlock("JOB.$job_name",LCK_EX) or next;
lck_unlock("EXPIRE.$volume_path",LCK_EX) or next;
lck_unlock("RUN.$stat_rowid",LCK_EX) or next;
return 1;
} continue {
$locked and lck_unlock("JOB.$job_name",LCK_EX) or next;
$locked and lck_unlock("EXPIRE.$volume_path",LCK_EX) or next;
$locked and lck_unlock("RUN.$stat_rowid",LCK_EX) or next;
}
$dbh->commit();
return 0;
}
sub cmd_balance {
upid_register_self();
while (1) {
while (1) {
balance_one() or last;
upid_shutdown_pending() and exit 0;
upid_doing('BALANCE pause');
sleep 1;
upid_shutdown_pending() and exit 0;
}
upid_shutdown_pending() and exit 0;
upid_doing('BALANCE idle, waiting');
sleep 10;
upid_shutdown_pending() and exit 0;
}
}
sub cmd_migrate {
upid_register_self();
while (1) {
while (1) {
migrate_one() or last;
upid_shutdown_pending() and exit 0;
upid_doing('MIGRATE pause');
sleep 1;
upid_shutdown_pending() and exit 0;
}
upid_shutdown_pending() and exit 0;
upid_doing('MIGRATE idle, waiting');
sleep 10;
upid_shutdown_pending() and exit 0;
}
}
sub cmd_expire {
upid_register_self();
while (1) {
while (1) {
expire_one() or last;
upid_shutdown_pending() and exit 0;
}
upid_shutdown_pending() and exit 0;
upid_doing('EXPIRE idle, waiting');
sleep 10;
upid_shutdown_pending() and exit 0;
}
}
sub cmd_move {
my ($job_name,$to)=@_; # 'prj_AGHucho' , [ 'C4123' ]
upid_register_self();
move_new($job_name,$to,'MOVE ');
}
sub cmd_do_jobs {
upid_register_self();
do_jobs();
upid_deregister_self();
}
sub cmd_amd_scan {
amd_scan(1);
}
sub cmd_kill {
@ARGV>=1 or die "usage: $0 kill pid...\n";
for my $pid (@ARGV) {
upid_kill($pid);
}
}
our $READDIR_CACHED_DIR='';
our %READDIR_CACHED_ENTRIES;
sub file_exists_cached {
my ($dir,$fn)=@_;
if ($dir ne $READDIR_CACHED_DIR) {
$READDIR_CACHED_DIR=$dir;
%READDIR_CACHED_ENTRIES=();
opendir D,$dir or return 0;
$READDIR_CACHED_ENTRIES{$_}=1 for readdir D;
closedir D;
}
return exists $READDIR_CACHED_ENTRIES{$fn};
}
sub cmd_bfix {
@ARGV==1 or die USAGE;
my ($job_name)=@ARGV;
my $sth=$dbh->prepare(<<'EOF');
SELECT stat.rowid,stat_started,path_path,volume_path,job_name,volume_id
FROM stat,job,volume,path
WHERE stat_job_id=job_id
AND stat_volume_id=volume_id
AND job_path_id=path_id
AND job_name=?
ORDER BY stat_started
EOF
$sth->execute($job_name);
while (my $row=$sth->fetchrow_arrayref) {
my ($stat_rowid,$stat_started,$path_path,$volume_path,$job_name,$volume_id)=@$row;
my $run_name=run_name($job_name,$stat_started,$path_path);
# unless (file_exists_cached("$volume_path/$job_name",$run_name)) {
# $e->("$volume_path/$job_name/$run_name : missing");
printf "stat.rowid %-6d volume_id %-2d %s\n",$stat_rowid,$volume_id," $volume_path/$job_name/$run_name";
}
print "\n(command : pbackup sql 'update stat set stat_volume_id=VVV where rowid=RRR'\n";
}
sub cmd_verify {
upid_register_self();
upid_doing("VERIFY");
my $err=0;
my $e=sub {
$err++;
print "\n\t$_[0]";
};
$|=1;
print "every datadir has a volume_id ... ";
for my $datadir (datadirs()) {
my ($volume_id)=$dbh->selectrow_array('SELECT volume_id FROM volume WHERE volume_path=?',undef,$datadir);
defined $volume_id or $e->("datadir $datadir has no volume_id");
}
print $err ? "\n" : " ok\n";
print "every volume has a datadir ... ";
for (@{$dbh->selectcol_arrayref('SELECT volume_path FROM volume')}) {
-d "$_/." or $e->("volume_path $_ : $!\n");
}
print $err ? "\n" : " ok\n";
print "every run has a directory ... ";
for (@{$dbh->selectall_arrayref(<<'EOF')}) {
SELECT job_name,stat_started,path_path,volume_path
FROM job,stat,path,volume
WHERE stat_job_id=job_id AND job_path_id=path_id AND stat_volume_id=volume_id
ORDER by job_name
EOF
my ($job_name,$stat_started,$path_path,$volume_path)=@$_;
my $run_name=run_name($job_name,$stat_started,$path_path);
unless (file_exists_cached("$volume_path/$job_name",$run_name)) {
$e->("$volume_path/$job_name/$run_name : missing");
}
}
print $err ? "\n" : " ok\n";
print "every run directory is known ...";
for my $datadir (datadirs()) {
my ($volume_id)=$dbh->selectrow_array('SELECT volume_id FROM volume WHERE volume_path=?',undef,$datadir);
unless (defined $volume_id) {
$e->("datadir $datadir has no volume_id");
next;
}
unless (chdir "$datadir") {
$e->("$datadir: $!");
next;
}
opendir D,'.';
while ( defined (my $jobdir=readdir D) ) {
$jobdir=~/^\.\.?$/ and next;
if ($jobdir =~ /\.BEING_DELETED$/) {
$e->("$datadir/$jobdir : should be deleted");
next;
}
##print "$datadir/$jobdir ";
unless (opendir R,$jobdir) {
$e->("$datadir/$jobdir: $!");
next;
}
my $run_cnt=0;
while ( defined (my $rundir=readdir R) ) {
$rundir=~/^\.\.?$/ and next;
##print ".";
my ($job_name,$timetag,$fstag)=parse_run_name($rundir);
unless (defined ($job_name)) {
$e->("$datadir/$jobdir/$rundir : invalid name");
next;
}
if ($job_name ne $jobdir) {
$e->("$datadir/$jobdir/$rundir : conflicting run and job directory");
next;
}
if ($timetag eq 'INCOMPLETE') {
# is this a valid and enabled job which eventually will be continued ?
my $ok=0;
for my $path (@{$dbh->selectcol_arrayref('SELECT path_path FROM job,path WHERE job_path_id=path_id AND job_name=? AND job_enabled=1',undef,$job_name)}) {
fstag($path) ne $fstag and next;
$ok=1;
last;
}
unless ($ok) {
$e->("$datadir/$jobdir/$rundir : obsolete INCOMPLETE run");
} else {
$run_cnt++; # we count this as valid
}
next;
}
my $started=tag2time($timetag);
unless (defined $started) {
$e->("$datadir/$jobdir/$rundir : invalid timetag");
next;
}
my $ok=0;
for my $path (@{$dbh->selectcol_arrayref('SELECT path_path FROM job,path,stat WHERE job_path_id=path_id AND stat_job_id=job_id AND job_name=? AND stat_volume_id=?',undef,$job_name,$volume_id)}) {
fstag($path) ne $fstag and next;
$ok=1;
last;
}
unless ($ok) {
$e->("$datadir/$jobdir/$rundir : run not in database");
} else {
$run_cnt++; # we count this as valid
}
}
##print "\n";
$run_cnt or $e->("$datadir/$jobdir : no valid runs");
}
}
print $err ? "\n" : " ok\n";
}
sub db_init {
my ($cnt)=$dbh->selectrow_array('SELECT COUNT(*) FROM global');
unless ($cnt) {
warn "initialze empty database\n";
$dbh->do('INSERT INTO global DEFAULT VALUES');
}
}
db_open();
db_init();
@ARGV>=1 or die USAGE;
my $cmd=shift;
purge_zombies();
if ($cmd eq 'status') {
cmd_status();
} elsif ($cmd eq 'jobs') {
cmd_jobs();
} elsif ($cmd eq 'job') {
cmd_job(@ARGV);
} elsif ($cmd eq 'do_jobs') {
cmd_do_jobs();
} elsif ($cmd eq 'lsof') {
system "lsof | grep ${DATADIR}"
} elsif ($cmd eq 'amd_scan') {
cmd_amd_scan();
} elsif ($cmd eq 'stat') {
cmd_stat();
} elsif ($cmd eq 'move') {
cmd_move(@ARGV);
} elsif ($cmd eq 'balance') {
cmd_balance();
} elsif ($cmd eq 'migrate') {
cmd_migrate();
} elsif ($cmd eq 'monitor') {
system "${BINDIR}/iview 1 ${BINDIR}/pbackup status"
} elsif ($cmd eq 'fix_stat') {
cmd_fix_stat();
} elsif ($cmd eq 'sql') {
system('sqlite3','-header',"${SQLITEDBFILE}",@ARGV);
} elsif ($cmd eq 'expire') {
cmd_expire();
} elsif ($cmd eq 'kill') {
cmd_kill(@ARGV);
} elsif ($cmd eq 'verify') {
cmd_verify(@ARGV);
} elsif ($cmd eq 'bfix') {
cmd_bfix(@ARGV);
} elsif ($cmd eq 'test') {
cmd_test(@ARGV);
} elsif ($cmd eq 'disable') {
cmd_disable(@ARGV);
} else {
die USAGE;
}