File: //usr/share/perl5/vendor_perl/Amavis/IO/RW.pm
# SPDX-License-Identifier: GPL-2.0-or-later
package Amavis::IO::RW;
use strict;
use re 'taint';
BEGIN {
require Exporter;
use vars qw(@ISA @EXPORT @EXPORT_OK %EXPORT_TAGS $VERSION);
$VERSION = '2.412';
@ISA = qw(Exporter);
}
use Errno qw(EIO EINTR EAGAIN EPIPE ENOTCONN ECONNRESET);
use Time::HiRes ();
use IO::Socket;
use IO::Socket::UNIX;
use IO::Socket::SSL;
use Amavis::Conf qw(:platform %smtp_tls_client_verifycn_name_maps);
use Amavis::Util qw(ll do_log min max minmax idn_to_ascii);
# Connect to one of the specified sockets. The $socket_specs may be a
# simple string ([inet-host]:port, [inet6-host]:port, or a unix socket name),
# optionally prefixed by a protocol name (scheme) and a colon (the prefix is
# ignored here, just avoids a need for parsing by a caller); or it can be
# a ref to a list of such socket specifications, which are tried one after
# another until a connection is successful. In case of a listref, it leaves
# a good socket as the first entry in the list so that it will be tried first
# on a next call.
# The 'Timeout' argument controls both the connect timeout as well as the
# timeout of a select() call in rw_loop() - but may be changed through a
# timeout() method.
#
sub new {
my($class, $socket_specs, %arg) = @_;
my $self = bless {}, $class;
$self->timeout($arg{Timeout});
$self->{eol_str} = !defined $arg{Eol} ? "\n" : $arg{Eol};
$self->{last_written_potential_eol} = '';
$self->{inp_sane_size} = !$arg{InpSaneSize} ? 500000 : $arg{InpSaneSize};
$self->{last_event_time} = 0; $self->{last_event_tx_time} = 0;
$self->{inp} = ''; $self->{out} = '';
$self->{inpeof} = 0; $self->{ssl_active} = 0;
$socket_specs = [ $socket_specs ] if !ref $socket_specs;
my($protocol,$socketname,$sock,$eval_stat);
my $attempts = 0; my(@failures);
my $n_candidates = scalar @$socket_specs;
$n_candidates > 0 or die "Can't connect, no sockets specified!?"; # sanity
for (;;) {
if ($n_candidates > 1) { # pick one at random, put it to head of the list
my $j = int(rand($n_candidates));
ll(5) && do_log(5, "picking candidate #%d (of %d) in %s",
$j+1, $n_candidates, join(', ',@$socket_specs));
@$socket_specs[0,$j] = @$socket_specs[$j,0] if $j != 0;
}
$socketname = $socket_specs->[0]; # try the first on the list
local($1);
$socketname =~ s/^([a-z][a-z0-9.+-]*)?://si; # strip protocol name
$protocol = lc($1); # kept for the benefit of a caller
$self->{socketname} = undef;
$attempts++;
eval {
$sock = $self->connect_attempt($socketname, %arg);
$sock or die "Error connecting to socket $socketname\n";
1;
} or do {
$eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat;
undef $sock;
};
if ($sock) { # mission accomplished
if (!@failures) {
do_log(5, "connected to %s successfully", $self->{socketname});
} else {
do_log(1, "connected to %s successfully after %d failures on: %s",
$self->{socketname}, scalar(@failures), join(', ',@failures));
}
last;
} else { # failure, prepare for a retry with a next entry if any
$n_candidates--;
my $ll = $attempts > 1 || $n_candidates <= 0 ? -1 : 1;
ll($ll) && do_log($ll, "connect to %s failed, attempt #%d: %s%s",
$socketname, $attempts, $eval_stat,
$n_candidates <= 0 ? '' : ', trying next');
push(@failures, $socketname);
# circular shift left, move a bad candidate to the end of the list
push(@$socket_specs, shift @$socket_specs) if @$socket_specs > 1;
last if $n_candidates <= 0;
}
}
$sock or die("All attempts ($attempts) failed connecting to ".
join(', ',@$socket_specs) . "\n");
$self->{socket} = $sock;
$self->{protocol} = $protocol;
$self;
}
sub connect_attempt {
my($self, $socketname, %arg) = @_;
my $sock;
my($localaddr, $localport) = ($arg{LocalAddr}, $arg{LocalPort});
my $blocking = 1; # blocking mode defaults to on
$blocking = 0 if defined $arg{Blocking} && !$arg{Blocking};
my $timeout = $self->timeout;
my $timeout_displ = !defined $timeout ? 'undef'
: int($timeout) == $timeout ? "$timeout"
: sprintf("%.3f",$timeout);
my($peeraddress, $peerport, $is_inet); local($1,$2,$3);
if ($socketname =~ m{^/}) { # simpleminded: unix vs. inet
$is_inet = 0;
} elsif ($socketname =~ /^(?: \[ ([^\]]*) \] | ([^:]*) ) : ([^:]*)/xs) {
# ignore possible further fields after the "proto:addr:port:..." last colon
$peeraddress = defined $1 ? $1 : $2; $peerport = $3; $is_inet = 1;
} elsif ($socketname =~ /^(?: \[ ([^\]]*) \] | ([0-9a-fA-F.:]+) ) \z/xs) {
$peeraddress = defined $1 ? $1 : $2; $is_inet = 1;
} else { # probably a syntax error, but let's assume it is a Unix socket
$is_inet = 0;
}
if ($is_inet) {
if (defined $peeraddress && $peeraddress eq '*') {
$peeraddress = $arg{WildcardImpliedHost};
defined $peeraddress
or die "Wildcarded host, but client's address not known: $socketname";
}
if (!defined $peeraddress || $peeraddress eq '') {
die "Empty/unknown host address in socket specification: $socketname";
}
$peerport = $arg{Port} if !defined $peerport || $peerport eq '';
if (defined $peerport && $peerport eq '*') {
$peerport = $arg{WildcardImpliedPort};
defined $peerport
or die "Wildcarded port, but client's port not known: $socketname";
}
if (!defined $peerport || $peerport eq '') {
die "Empty/unknown port number in socket specification: $socketname";
} elsif ($peerport !~ /^\d{1,5}\z/ || $peerport < 1 || $peerport > 65535) {
die "Invalid port number in socket specification: $socketname";
}
}
$self->{last_event_time} = $self->{last_event_tx_time} = Time::HiRes::time;
if (!$is_inet) {
# unix socket
ll(3) && do_log(3, "new socket by IO::Socket::UNIX to %s, ".
"timeout set to %s", $socketname, $timeout_displ);
$sock = IO::Socket::UNIX->new(
# Domain => AF_UNIX,
Type => SOCK_STREAM, Timeout => $timeout);
$sock or die "Can't create UNIX socket: $!\n";
$sock->connect( pack_sockaddr_un($socketname) )
or die "Can't connect to a UNIX socket $socketname: $!\n";
$self->{last_event} = 'new-unix';
} else { # inet or inet6
defined $io_socket_module_name
or die "No INET or INET6 socket module is available";
my $local_sock_displ = '';
$peeraddress = idn_to_ascii($peeraddress);
my(%args) = (Type => SOCK_STREAM, Proto => 'tcp', Blocking => $blocking,
PeerAddr => $peeraddress, PeerPort => $peerport);
# Timeout => $timeout, # produces: Invalid argument
if (defined $localaddr && $localaddr ne '') {
$args{LocalAddr} = $localaddr;
$local_sock_displ .= '[' . $localaddr . ']';
}
if (defined $localport && $localport ne '') {
$args{LocalPort} = $localport;
$local_sock_displ .= ':' . $localport;
}
ll(3) && do_log(3,"new socket using %s to [%s]:%s, timeout %s%s%s",
$io_socket_module_name, $peeraddress, $peerport,
$timeout_displ, $blocking ? '' : ', nonblocking',
$local_sock_displ eq '' ? ''
: ', local '.$local_sock_displ);
$sock = $io_socket_module_name->new(%args);
if (!$sock) {
# note: the IO::Socket::IP constructor provides an error message in $@
die sprintf("Can't connect to socket %s using module %s: %s\n",
$socketname, $io_socket_module_name,
$io_socket_module_name eq 'IO::Socket::IP' ? $@ : $!);
}
$self->{last_event} = 'new-' . $io_socket_module_name;
}
if ($sock) {
$self->{socketname} = $is_inet ? "[$peeraddress]:$peerport" : $socketname;
$self->{verifycn_name} = $smtp_tls_client_verifycn_name_maps{ $socketname } // ($is_inet ? $peeraddress : undef);
do_log(3, 'verifycn_name: %s', $self->{verifycn_name});
}
$sock;
}
sub internal_close {
my($self, $destroying) = @_;
my $sock = $self->{socket};
my $status = 1; # ok
if (!defined($sock)) {
# nothing to do
} elsif (!defined fileno($sock)) { # not really open
$sock->close; # ignoring errors
} else {
my $flush_status = 1; # ok
eval { # don't let errors during flush prevent us from closing a socket
$flush_status = $self->flush;
} or do {
undef $flush_status; # false, indicates a signalled failure
my $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat;
do_log($destroying ? 5 : 1,
"closing: Error flushing socket in Amavis::IO::RW::%s: %s",
$destroying?'DESTROY':'close', $eval_stat);
};
$self->{last_event} = 'close';
$self->{last_event_time} = $self->{last_event_tx_time} = Time::HiRes::time;
$! = 0; $status = $sock->close;
$status or do_log($destroying ? 5 : 1,
"closing: Error closing socket in Amavis::IO::RW::%s: %s",
$destroying?'DESTROY':'close',
!$self->{ssl_active} ? $! : $sock->errstr.", $!" );
$status = $flush_status if $status && !$flush_status;
}
$status;
}
sub close {
my $self = $_[0];
$self->internal_close(0);
}
sub DESTROY {
my $self = $_[0]; local($@,$!,$_);
# ignore failure, make perlcritic happy
eval { $self->internal_close(1) } or 1;
}
sub rw_loop {
my($self,$needline,$flushoutput) = @_;
#
# RFC 2920: Client SMTP implementations MAY elect to operate in a nonblocking
# fashion, processing server responses immediately upon receipt, even if
# there is still data pending transmission from the client's previous TCP
# send operation. If nonblocking operation is not supported, however, client
# SMTP implementations MUST also check the TCP window size and make sure that
# each group of commands fits entirely within the window. The window size
# is usually, but not always, 4K octets. Failure to perform this check can
# lead to deadlock conditions.
#
# We choose to operate in a nonblocking mode. Responses are read as soon as
# they become available and stored for later, but not immediately processed
# as they come in. This requires some sanity limiting against rogue servers.
#
my $sock = $self->{socket};
my $fd_sock = fileno($sock);
my $timeout = $self->timeout;
my $timeout_displ = !defined $timeout ? 'undef'
: int($timeout) == $timeout ? "$timeout"
: sprintf("%.3f",$timeout);
my $eol_str = $self->{eol_str};
my $idle_cnt = 0; my $failed_write_attempts = 0;
local $SIG{PIPE} = 'IGNORE'; # don't signal on a write to a widowed pipe
for (;;) {
$idle_cnt++;
my($rout,$wout,$eout,$rin,$win,$ein); $rin=$win=$ein='';
my $want_to_write = $self->{out} ne '' && ($flushoutput || $needline);
ll(5) && do_log(5, 'rw_loop: needline=%d, flush=%s, wr=%d, timeout=%s',
$needline, $flushoutput, $want_to_write, $timeout_displ);
if (!defined($fd_sock)) {
do_log(3, 'rw_loop read: got a closed socket');
$self->{inpeof} = 1; last;
}
vec($rin,$fd_sock,1) = 1;
vec($win,$fd_sock,1) = $want_to_write ? 1 : 0;
$ein = $rin | $win;
$self->{last_event} = 'select';
$self->{last_event_time} = Time::HiRes::time;
my($nfound,$timeleft) =
select($rout=$rin, $wout=$win, $eout=$ein, $timeout);
defined $nfound && $nfound >= 0
or die "Select failed: ".
(!$self->{ssl_active} ? $! : $sock->errstr.", $!");
if (vec($rout,$fd_sock,1)) {
ll(5) && do_log(5, 'rw_loop: receiving');
my $inbuf = ''; $! = 0;
my $nread = sysread($sock,$inbuf,16384);
if ($nread) { # successful read
$self->{last_event} = 'read-ok';
$self->{inpeof} = 0;
ll(5) && do_log(5,'rw_loop read %d chars< %s', length($inbuf),$inbuf);
$self->{inp} .= $inbuf; $idle_cnt = 0;
length($self->{inp}) < $self->{inp_sane_size}
or die "rw_loop: Aborting on a runaway server, inp_len=" .
length($self->{inp});
} elsif (defined $nread) { # defined but zero, sysread returns 0 at eof
$self->{last_event} = 'read-eof';
$self->{inpeof} = 1; do_log(3, 'rw_loop read: got eof');
} elsif ($! == EAGAIN || $! == EINTR) {
$self->{last_event} = 'read-intr'.(0+$!);
$idle_cnt = 0;
do_log($SSL_ERROR == SSL_WANT_READ ? 4 : 2,
'rw_loop read interrupted: %s',
!$self->{ssl_active} ? $! : $sock->errstr.", $!");
Time::HiRes::sleep(0.1); # slow down, just in case
# retry
} else {
$self->{last_event} = 'read-fail';
$self->{inpeof} = 1;
die "Error reading from socket: ".
(!$self->{ssl_active} ? $! : $sock->errstr.", $!");
}
$self->{last_event_time} = Time::HiRes::time;
}
if (vec($wout,$fd_sock,1)) {
my $out_l = length($self->{out});
ll(5) && do_log(5,'rw_loop: sending %d chars', $out_l);
my $nwrite = syswrite($sock, $self->{out});
if (!defined($nwrite)) {
if ($! == EAGAIN || $! == EINTR) {
$self->{last_event} = 'write-intr'.(0+$!);
$idle_cnt = 0; $failed_write_attempts++;
do_log(2, 'rw_loop writing %d bytes interrupted: %s', $out_l,
!$self->{ssl_active} ? $! : $sock->errstr.", $!");
Time::HiRes::sleep(0.1); # slow down, just in case
} else {
$self->{last_event} = 'write-fail';
die sprintf('Error writing %d bytes to socket: %s', $out_l,
!$self->{ssl_active} ? $! : $sock->errstr.", $!");
}
} else { # successful write
$self->{last_event} = 'write-ok';
my $ll = $nwrite != $out_l ? 4 : 5;
if (ll($ll)) {
my $msg = $nwrite==$out_l ? sprintf("%d", $nwrite)
: sprintf("%d (of %d)", $nwrite,$out_l);
my $nlog = min(200,$nwrite);
do_log($ll, 'rw_loop sent %s> %s%s',
$msg, substr($self->{out},0,$nlog), $nlog<$nwrite?' [...]':'');
};
$idle_cnt = 0;
if ($nwrite <= 0) { $failed_write_attempts++ }
elsif ($nwrite < $out_l) { substr($self->{out},0,$nwrite) = '' }
else { $self->{out} = '' }
}
$self->{last_event_time} = $self->{last_event_tx_time} =
Time::HiRes::time;
}
if ( ( !$needline || !defined($eol_str) || $eol_str eq '' ||
index($self->{inp},$eol_str) >= 0 ) &&
( !$flushoutput || $self->{out} eq '' ) ) {
last;
}
if ($self->{inpeof}) {
if ($self->{out} ne '') {
do_log(2, 'rw_loop: EOF on input, output buffer not yet empty');
}
last;
}
if ($idle_cnt > 0) { # probably exceeded timeout in select
do_log(-1, 'rw_loop: leaving rw loop, no progress, '.
'last event (%s) %.3f s ago', $self->{last_event},
Time::HiRes::time - $self->{last_event_time});
last;
}
$failed_write_attempts < 100 or die "rw_loop: Aborting stalled sending";
}
}
sub socketname
{ @_<2 ? shift->{socketname} : ($_[0]->{socketname} = $_[1]) }
sub protocol
{ @_<2 ? shift->{protocol} : ($_[0]->{protocol} = $_[1]) }
sub timeout
{ @_<2 ? shift->{timeout} : ($_[0]->{timeout} = $_[1]) }
sub ssl_active
{ @_<2 ? shift->{ssl_active} : ($_[0]->{ssl_active} = $_[1]) }
sub eof
{ @_<2 ? shift->{client_ip} : ($_[0]->{client_ip} = $_[1]) }
sub last_io_event_timestamp
{ my($self,$keyword) = @_; $self->{last_event_time} }
sub last_io_event_tx_timestamp
{ my($self,$keyword) = @_; $self->{last_event_tx_time} }
sub flush
{ my $self = $_[0]; $self->rw_loop(0,1) if $self->{out} ne ''; 1 }
sub discard_pending_output
{ my $self = $_[0]; my $len = length $self->{out}; $self->{out} = ''; $len }
sub out_buff_large
{ my $self = $_[0]; length $self->{out} > 40000 }
sub print {
my $self = shift;
$self->{out} .= $_ for @_;
$self->_update_last_written_potential_eol;
# $self->out_buff_large ? $self->flush : 1;
length $self->{out} > 40000 ? $self->flush : 1; # inlined out_buff_large()
}
# $self->{last_written_potential_eol} is used by $self->at_line_boundary
# to determine if we are at a line boundary. The idea here is that
# $self->{last_written_potential_eol} always contains the last bytes from
# $self->{out} with the same length as $self->{eol_str}.
sub _update_last_written_potential_eol {
my $self = shift;
return unless defined($self->{eol_str});
my $eol_l = length($self->{eol_str});
my $new_eol = '';
my $missing = $eol_l;
foreach my $last_bytes ($self->{out}, $self->{last_written_potential_eol}) {
last unless $missing;
$new_eol = substr($last_bytes, -$missing) . $new_eol;
$missing = $eol_l - length($new_eol);
}
$self->{last_written_potential_eol} = $new_eol;
}
sub at_line_boundary {
my $self = $_[0];
return 0 unless defined($self->{eol_str});
# This either happens when $self->{eol_str} is an empty string, in
# which case everything is at a line boundary, or if nothing has
# been written yet, so we are at the start (boundary) of the first
# line.
return 1 if $self->{last_written_potential_eol} eq '';
# Since $self->{last_written_potential_eol} always contains the last written bytes,
# even if $self->{out} was flushed, we can just check if it equals
# $self->{eol_str}.
return $self->{last_written_potential_eol} eq $self->{eol_str};
}
# returns true if there is any full line (or last incomplete line)
# in the buffer waiting to be read, 0 otherwise, undef on eof or error
#
sub response_line_available {
my $self = $_[0];
my $eol_str = $self->{eol_str};
if (!defined $eol_str || $eol_str eq '') {
return length($self->{inp});
} elsif (index($self->{inp},$eol_str) >= 0) {
return 1;
} elsif ($self->{inpeof} && $self->{inp} eq '') {
return; # undef on end-of-file
} elsif ($self->{inpeof}) { # partial last line
return length($self->{inp});
}
}
# get one full text line, or last partial line, or undef on eof/error/timeout
#
sub get_response_line {
my $self = $_[0];
my $ind; my $attempts = 0;
my $eol_str = $self->{eol_str};
my $eol_str_l = !defined($eol_str) ? 0 : length($eol_str);
for (;;) {
if (!$eol_str_l) {
my $str = $self->{inp}; $self->{inp} = ''; return $str;
} elsif (($ind=index($self->{inp},$eol_str)) >= 0) {
return substr($self->{inp},0,$ind+$eol_str_l,'');
} elsif ($self->{inpeof} && $self->{inp} eq '') {
$! = 0; return; # undef on end-of-file
} elsif ($self->{inpeof}) { # return partial last line
my $str = $self->{inp}; $self->{inp} = ''; return $str;
} elsif ($attempts > 0) {
$! = EIO; return; # timeout or error
}
# try reading some more input, one attempt only
$self->rw_loop(1,0); $attempts++;
}
}
# read whatever is available, up to LENGTH bytes
#
sub read { # SCALAR,LENGTH,OFFSET
my $self = shift; my $len = $_[1]; my $offset = $_[2];
defined $len or die "Amavis::IO::RW::read: length argument undefined";
$len >= 0 or die "Amavis::IO::RW::read: length argument negative";
$self->rw_loop(0,0);
my $nbytes = length($self->{inp});
$nbytes = $len if $len < $nbytes;
if (!defined($offset) || $offset == 0) {
$_[0] = substr($self->{inp}, 0, $len, '');
} else {
substr($_[0],$offset) = substr($self->{inp}, 0, $len, '');
}
$nbytes; # eof: 0; error: undef
}
use vars qw($ssl_cache);
sub ssl_upgrade {
my($self, %tls_options) = @_;
$self->flush;
IO::Socket::SSL->VERSION(1.05); # required minimal version
$ssl_cache = IO::Socket::SSL::Session_Cache->new(2) if !defined $ssl_cache;
my $sock = $self->{socket};
IO::Socket::SSL->start_SSL($sock,
SSL_session_cache => $ssl_cache,
SSL_error_trap => sub {
my($sock,$msg) = @_;
do_log(-2,"Upgrading socket to TLS failed (in ssl_upgrade): %s", $msg);
},
(defined($self->{verifycn_name})
? (SSL_verifycn_name => $self->{verifycn_name})
: ()
),
%tls_options,
) or die "Error upgrading output socket to TLS: ".IO::Socket::SSL::errstr();
$self->{last_event} = 'ssl-upgrade';
$self->{last_event_time} = $self->{last_event_tx_time} = Time::HiRes::time;
$self->{ssl_active} = 1;
# An IO::Socket::SSL socket can block a sysread
# even if selected for read. See issue 74 and
# perldoc IO::Socket::SSL `Using Non-Blocking Sockets`
if (defined $sock->blocking(0)) {
do_log(4, "Setting TLS socket to non-blocking");
} else {
die "Error setting TLS socket to non-blocking: $!";
}
ll(3) && do_log(3,"TLS cipher: %s", $sock->get_cipher);
ll(5) && do_log(5,"TLS certif: %s", $sock->dump_peer_certificate);
1;
}
1;