File: //usr/share/perl5/vendor_perl/Amavis/IO/SQL.pm
package Amavis::IO::SQL;
# an IO wrapper around SQL for inserting/retrieving mail text
# to/from a database
use strict;
use re 'taint';
use warnings;
use warnings FATAL => qw(utf8 void);
no warnings 'uninitialized';
# use warnings 'extra'; no warnings 'experimental::re_strict'; use re 'strict';
BEGIN {
require Exporter;
use vars qw(@ISA @EXPORT @EXPORT_OK %EXPORT_TAGS $VERSION);
$VERSION = '2.412';
@ISA = qw(Exporter);
}
use Errno qw(ENOENT EACCES EIO);
use DBI qw(:sql_types);
# use DBD::Pg;
use Amavis::Util qw(ll do_log untaint min max minmax);
sub new {
my $class = shift;
my $self = bless {}, $class;
if (@_) { $self->open(@_) or return }
$self;
}
sub open {
my $self = shift;
if (exists $self->{conn_h}) {
eval { $self->close } or 1; # ignore failure, make perlcritic happy
}
@$self{qw(conn_h clause dbkey mode partition_tag maxbuf rx_time)} = @_;
my $conn_h = $self->{conn_h}; $self->{buf} = '';
$self->{chunk_ind} = $self->{pos} = $self->{bufpos} = $self->{eof} = 0;
my $driver; my $eval_stat;
eval { $driver = $conn_h->driver_name; 1 }
or do { $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat };
die $eval_stat if $eval_stat =~ /^timed out\b/; # resignal timeout
if ($self->{mode} eq 'w') { # open for write access
ll(4) && do_log(4,"Amavis::IO::SQL::open %s drv=%s (%s); key=%s, p_tag=%s",
$self->{mode}, $driver, $self->{clause},
$self->{dbkey}, $self->{partition_tag});
} else { # open for read access
$eval_stat = undef;
eval {
$conn_h->execute($self->{clause}, $self->{partition_tag},$self->{dbkey});
1;
} or do { $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat };
my $ll = $eval_stat ne '' ? -1 : 4;
do_log($ll,"Amavis::IO::SQL::open %s drv=%s (%s); key=%s, p_tag=%s, s: %s",
$self->{mode}, $driver, $self->{clause},
$self->{dbkey}, $self->{partition_tag}, $eval_stat) if ll($ll);
if ($eval_stat ne '') {
if ($eval_stat =~ /^timed out\b/) { die $eval_stat } # resignal timeout
else { die "Amavis::IO::SQL::open $driver SELECT error: $eval_stat" }
$! = EIO; return; # not reached
}
$eval_stat = undef;
eval { # fetch the first chunk; if missing treat it as a file-not-found
my $a_ref = $conn_h->fetchrow_arrayref($self->{clause});
if (!defined($a_ref)) { $self->{eof} = 1 }
else { $self->{buf} = $a_ref->[0]; $self->{chunk_ind}++ }
1;
} or do {
$eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat;
if ($eval_stat =~ /^timed out\b/) { die $eval_stat } # resignal timeout
else { die "Amavis::IO::SQL::open $driver read error: $eval_stat" }
$! = EIO; return; # not reached
};
if ($self->{eof}) { # no records, make it look like a missing file
do_log(0,"Amavis::IO::SQL::open key=%s, p_tag=%s: no such record",
$self->{dbkey}, $self->{partition_tag});
$! = ENOENT; # No such file or directory
return;
}
}
$self;
}
sub DESTROY {
my $self = $_[0];
local($@,$!,$_); my $myactualpid = $$;
if ($self && $self->{conn_h}) {
eval {
$self->close or die "Error closing: $!"; 1;
} or do {
my $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat;
warn "[$myactualpid] Amavis::IO::SQL::close error: $eval_stat";
};
delete $self->{conn_h};
}
}
sub close {
my $self = $_[0];
my $eval_stat;
eval {
if ($self->{mode} eq 'w') {
$self->flush or die "Can't flush: $!";
} elsif ($self->{conn_h} && $self->{clause} && !$self->{eof}) {
# reading, closing before eof was reached
$self->{conn_h}->finish($self->{clause}) or die "Can't finish: $!";
};
1;
} or do {
$eval_stat = $@ ne '' ? $@ : "errno=$!";
};
delete @$self{
qw(conn_h clause dbkey mode maxbuf rx_time buf chunk_ind pos bufpos eof) };
if (defined $eval_stat) {
chomp $eval_stat;
if ($eval_stat =~ /^timed out\b/) { die $eval_stat } # resignal timeout
else { die "Error closing, $eval_stat" }
$! = EIO; return; # not reached
}
1;
}
sub seek {
my($self,$pos,$whence) = @_;
$whence == 0 or die "Only absolute seek is supported on sql i/o";
$pos >= 0 or die "Can't seek to a negative absolute position on sql i/o";
ll(5) && do_log(5, "Amavis::IO::SQL::seek mode=%s, pos=%s",
$self->{mode}, $pos);
$self->{mode} ne 'w'
or die "Seek to $whence,$pos on sql i/o only supported for read mode";
if ($pos < $self->{pos}) {
if (!$self->{eof} && $self->{chunk_ind} <= 1) {
# still in the first chunk, just reset pos
$self->{pos} = $self->{bufpos} = 0; # reset
} else { # beyond the first chunk, restart the query from the beginning
my($con,$clause,$key,$mode,$partition_tag,$maxb,$rx_time) =
@$self{qw(conn_h clause dbkey mode partition_tag maxbuf rx_time)};
$self->close or die "seek: error closing, $!";
$self->open($con,$clause,$key,$mode,$partition_tag,$maxb,$rx_time)
or die "seek: reopen failed: $!";
}
}
my $skip = $pos - $self->{pos};
if ($skip > 0) {
my $s; my $nbytes = $self->read($s,$skip); # acceptable for small skips
defined $nbytes or die "seek: error skipping $skip bytes on sql i/o: $!";
}
1; # seek is supposed to return 1 upon success, 0 otherwise
}
sub read { # SCALAR,LENGTH,OFFSET
my $self = shift; my $req_len = $_[1]; my $offset = $_[2];
my $conn_h = $self->{conn_h}; my $a_ref;
ll(5) && do_log(5, "Amavis::IO::SQL::read, %d, %d",
$self->{chunk_ind}, $self->{bufpos});
eval {
while (!$self->{eof} && length($self->{buf})-$self->{bufpos} < $req_len) {
$a_ref = $conn_h->fetchrow_arrayref($self->{clause});
if (!defined($a_ref)) { $self->{eof} = 1 }
else { $self->{buf} .= $a_ref->[0]; $self->{chunk_ind}++ }
}
1;
} or do {
my $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat;
# we can't stash an arbitrary error message string into $!,
# which forces us to use 'die' to properly report an error
if ($eval_stat =~ /^timed out\b/) { die $eval_stat } # resignal timeout
else { die "read: sql select failed, $eval_stat" }
$! = EIO; return; # not reached
};
my $nbytes;
if (!defined($offset) || $offset == 0) {
$_[0] = substr($self->{buf}, $self->{bufpos}, $req_len);
$nbytes = length($_[0]);
} else {
my $buff = substr($self->{buf}, $self->{bufpos}, $req_len);
substr($_[0],$offset) = $buff; $nbytes = length($buff);
}
$self->{bufpos} += $nbytes; $self->{pos} += $nbytes;
if ($self->{bufpos} > 0 && $self->{chunk_ind} > 1) {
# discard used-up part of the buf unless at ch.1, which may still be useful
ll(5) && do_log(5,"read: moving on by %d chars", $self->{bufpos});
$self->{buf} = substr($self->{buf},$self->{bufpos}); $self->{bufpos} = 0;
}
$nbytes; # eof: 0, error: undef
}
sub getline {
my $self = $_[0]; my $conn_h = $self->{conn_h};
ll(5) && do_log(5, "Amavis::IO::SQL::getline, chunk %d, pos %d",
$self->{chunk_ind}, $self->{bufpos});
my($a_ref,$line); my $ind = -1;
eval {
while (!$self->{eof} &&
($ind=index($self->{buf},"\n",$self->{bufpos})) < 0) {
$a_ref = $conn_h->fetchrow_arrayref($self->{clause});
if (!defined($a_ref)) { $self->{eof} = 1 }
else { $self->{buf} .= $a_ref->[0]; $self->{chunk_ind}++ }
}
1;
} or do {
my $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat;
if ($eval_stat =~ /^timed out\b/) { die $eval_stat } # resignal timeout
else { die "getline: reading sql select results failed, $eval_stat" }
$! = EIO; return; # not reached
};
if ($ind < 0 && $self->{eof}) # imply a NL before eof if missing
{ $self->{buf} .= "\n"; $ind = index($self->{buf}, "\n", $self->{bufpos}) }
$ind >= 0 or die "Programming error, NL not found";
if (length($self->{buf}) > $self->{bufpos}) { # nonempty buffer?
$line = substr($self->{buf}, $self->{bufpos}, $ind+1-$self->{bufpos});
my $nbytes = length($line);
$self->{bufpos} += $nbytes; $self->{pos} += $nbytes;
if ($self->{bufpos} > 0 && $self->{chunk_ind} > 1) {
# discard used part of the buf unless at ch.1, which may still be useful
ll(5) && do_log(5,"getline: moving on by %d chars", $self->{bufpos});
$self->{buf} = substr($self->{buf},$self->{bufpos}); $self->{bufpos} = 0;
}
}
# eof: undef, $! zero; error: undef, $! nonzero
$! = 0; $line eq '' ? undef : $line;
}
sub flush {
my $self = $_[0];
return if $self->{mode} ne 'w';
my $msg; my $conn_h = $self->{conn_h};
while ($self->{buf} ne '') {
my $ind = $self->{chunk_ind} + 1;
ll(4) && do_log(4, "sql flush: key: (%s, %d), p_tag=%s, rx_t=%d, size=%d",
$self->{dbkey}, $ind, $self->{partition_tag}, $self->{rx_time},
min(length($self->{buf}),$self->{maxbuf}));
eval {
my $driver = $conn_h->driver_name;
$conn_h->execute($self->{clause},
$self->{partition_tag}, $self->{dbkey}, $ind,
# int($self->{rx_time}),
[ untaint(substr($self->{buf},0,$self->{maxbuf})),
$driver eq 'Pg' ? { pg_type => DBD::Pg::PG_BYTEA() }
: SQL_BLOB ] );
1;
} or do {
my $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat;
$msg = $eval_stat;
};
last if defined $msg;
substr($self->{buf},0,$self->{maxbuf}) = ''; $self->{chunk_ind} = $ind;
}
if (defined $msg) {
chomp $msg;
if ($msg =~ /^timed out\b/) { die $msg } # resignal timeout
else {
$msg = "flush: sql inserting text failed, $msg";
die $msg; # we can't stash an arbitrary error message string into $!,
# which forces us to use 'die' to properly report an error
}
$! = EIO; return; # not reached
}
1;
}
sub print {
my $self = shift;
$self->{mode} eq 'w' or die "Can't print, not opened for writing";
my $buff_ref = @_ == 1 ? \$_[0] : \join('',@_);
my $len = length($$buff_ref);
my $nbytes; my $conn_h = $self->{conn_h};
if ($len <= 0) { $nbytes = "0 but true" }
else {
$self->{buf} .= $$buff_ref; $self->{pos} += $len; $nbytes = $len;
while (length($self->{buf}) >= $self->{maxbuf}) {
my $ind = $self->{chunk_ind} + 1;
ll(4) && do_log(4, "sql print: key: (%s, %d), p_tag=%s, size=%d",
$self->{dbkey}, $ind,
$self->{partition_tag}, $self->{maxbuf});
eval {
my $driver = $conn_h->driver_name;
$conn_h->execute($self->{clause},
$self->{partition_tag}, $self->{dbkey}, $ind,
# int($self->{rx_time}),
[ untaint(substr($self->{buf},0,$self->{maxbuf})),
$driver eq 'Pg' ? { pg_type => DBD::Pg::PG_BYTEA() }
: SQL_BLOB ] );
1;
} or do {
my $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat;
# we can't stash an arbitrary error message string into $!,
# which forces us to use 'die' to properly report an error
if ($eval_stat =~ /^timed out\b/) { die $eval_stat } # resignal timeout
else { die "print: sql inserting mail text failed, $eval_stat" }
$! = EIO; return; # not reached
};
substr($self->{buf},0,$self->{maxbuf}) = ''; $self->{chunk_ind} = $ind;
}
}
$nbytes;
}
sub printf { shift->print(sprintf(shift,@_)) }
1;