File: //usr/share/perl5/vendor_perl/Amavis/Redis.pm
# SPDX-License-Identifier: GPL-2.0-or-later
package Amavis::Redis;
use strict;
use re 'taint';
use warnings;
use warnings FATAL => qw(utf8 void);
no warnings 'uninitialized';
# use warnings 'extra'; no warnings 'experimental::re_strict'; use re 'strict';
BEGIN {
require Exporter;
use vars qw(@ISA @EXPORT @EXPORT_OK %EXPORT_TAGS $VERSION);
$VERSION = '2.412';
@ISA = qw(Exporter);
}
use Amavis::Conf qw(:platform :confvars c cr ca);
use Amavis::JSON;
use Amavis::Lookup::IP qw(lookup_ip_acl normalize_ip_addr);
use Amavis::rfc2821_2822_Tools;
use Amavis::Timing qw(section_time);
use Amavis::Util qw(ll do_log do_log_safe min max minmax untaint
safe_encode safe_encode_utf8 idn_to_ascii
format_time_interval unique_list snmp_count);
use Amavis::TinyRedis;
sub new {
my($class, @redis_dsn) = @_;
bless { redis_dsn => \@redis_dsn }, $class;
}
sub disconnect {
my $self = $_[0];
# do_log(5, "redis: disconnect");
$self->{connected} = 0; undef $self->{redis};
}
sub on_connect {
my($self, $r) = @_;
my $db_id = $self->{db_id} || 0;
do_log(5, "redis: on_connect, db_id %d", $db_id);
eval {
$r->call('SELECT', $db_id) eq 'OK' ? 1 : 0;
} or do {
if ($@ =~ /^NOAUTH\b/ || $@ =~ /^ERR operation not permitted/) {
defined $self->{password}
or die "Redis server requires authentication, no password provided";
$r->call('AUTH', $self->{password});
$r->call('SELECT', $db_id);
} else {
chomp $@; die "Command 'SELECT $db_id' failed: $@";
}
};
eval {
$r->call('CLIENT', 'SETNAME', 'amavis['.$$.']') eq 'OK' ? 1 : 0;
} or do { # no big deal, just log
do_log(5, "redis: command 'CLIENT SETNAME' failed: %s", $@);
};
1;
}
sub connect {
my $self = $_[0];
# do_log(5, "redis: connect");
$self->disconnect if $self->{connected};
$self->{redis} = $self->{db_id} = $self->{ttl} = undef;
my($r, $err, $dsn, %options);
my $dsn_list_ref = $self->{redis_dsn};
for my $j (1 .. @$dsn_list_ref) {
$dsn = $dsn_list_ref->[0];
%options = ref $dsn eq 'HASH' ? %$dsn : ();
# expiration time (time-to-live) is 16 days by default
$self->{ttl} = exists $options{ttl} ? $options{ttl} : $storage_redis_ttl;
$self->{db_id} = $options{db_id};
if (defined $options{password}) {
$self->{password} = $options{password};
$options{password} = '(hidden)'; # for logging purposes
}
undef $err;
eval {
my %opt = %options; delete @opt{qw(ttl db_id password)};
$opt{server} = idn_to_ascii($opt{server}) if defined $opt{server};
$r = Amavis::TinyRedis->new(on_connect => sub { $self->on_connect(@_) },
%opt);
$r or die "Error: $!";
} or do {
undef $r; $err = $@; chomp $err;
};
$self->{redis} = $r;
last if $r; # success, done
if ($j < @$dsn_list_ref) { # not all tried yet
do_log(0, "Can't connect to a redis server, %s: %s; trying next",
join(' ',%options), $err);
push(@$dsn_list_ref, shift @$dsn_list_ref); # rotate left
}
}
if (!$r) {
$self->{redis} = $self->{db_id} = $self->{ttl} = undef;
die sprintf("Can't connect to a redis server %s: %s\n",
join(' ',%options), $err);
}
$self->{connected} = 1;
ll(5) && do_log(5, "redis: connected to: %s, ttl %s s",
!defined $options{server} ? 'default server'
: join(' ',%options),
$self->{ttl}||'x');
section_time("redis-connect");
$self->load_lua_programs;
$r;
}
sub DESTROY {
my $self = $_[0]; local($@,$!,$_);
do_log_safe(5,"Amavis::Redis DESTROY called");
# ignore potential errors during DESTROY of a Redis object
eval { $self->{connected} = 0; undef $self->{redis} };
}
# find a penpals record which proves that a local user (sender) really sent a
# mail to a given recipient some time ago. Returns an interval time in seconds
# since the last such mail was sent by our local user to a specified recipient
# (or undef if information is not available). If @$message_id_list is a
# nonempty list of Message-IDs as found in References header field, the query
# also finds previous outgoing messages with a matching Message-ID but
# possibly to recipients different from what the mail was originally sent to.
#
sub penpals_find {
my($self, $msginfo, $message_id_list) = @_;
my $sender = $msginfo->sender;
$message_id_list = [] if !$message_id_list;
return if !@$message_id_list && $sender eq '';
# inbound or internal_to_internal, except self_to_self
my(@per_recip_data) = grep(!$_->recip_done && $_->recip_is_local &&
lc($sender) ne lc($_->recip_addr),
@{$msginfo->per_recip_data});
return if !@per_recip_data;
# do_log(5, "redis: penpals_find");
snmp_count('PenPalsAttempts');
my $sender_smtp = $msginfo->sender_smtp;
local($1); $sender_smtp =~ s/^<(.*)>\z/$1/s;
my(@recip_addresses) =
map { my $a = $_->recip_addr_smtp; $a =~ s/^<(.*)>\z/$1/s; lc $a }
@per_recip_data;
# NOTE: swap recipient and sender in a query here, as we are
# now checking for a potential reply mail - whether the current
# recipient has recently sent any mail to the sender of the
# current mail:
# no need for cryptographical strength, just checking for protocol errors
my $nonce = $msginfo->mail_id;
my $result;
my @args = (
0, sprintf("%.0f",$msginfo->rx_time), $nonce, lc $sender_smtp,
scalar @recip_addresses, @recip_addresses,
scalar @$message_id_list, @$message_id_list,
);
eval {
$self->connect if !$self->{connected};
$result =
$self->{redis}->call('EVALSHA', $self->{lua_query_penpals}, @args);
1;
} or do { # Lua function probably not cached, define again and re-try
if ($@ !~ /^NOSCRIPT/) {
$self->disconnect; undef $result; chomp $@;
do_log(-1, 'penpals_find, Redis Lua error: %s', $@);
} else {
$self->load_lua_programs;
$result =
$self->{redis}->call('EVALSHA', $self->{lua_query_penpals}, @args);
}
};
my $ok = 1;
if (!$result || !@$result) {
$ok = 0; $self->disconnect;
do_log(0, "redis: penpals_find - no results");
} else {
my $r_nonce = pop(@$result);
if (!defined($r_nonce) || $r_nonce ne $nonce) {
# redis protocol falling out of step?
$ok = 0; $self->disconnect;
do_log(-1,"redis: penpals_find - nonce mismatch, expected %s, got %s",
$nonce, defined $r_nonce ? $r_nonce : 'UNDEF');
}
}
if ($ok && (@$result != @per_recip_data)) {
$ok = 0; $self->disconnect;
do_log(-1,"redis: penpals_find - number of results expected %d, got %d",
scalar @per_recip_data, scalar @$result);
}
if ($ok) {
for my $r (@per_recip_data) {
my $result_entry = shift @$result;
next if !$result_entry;
my($sid, $rid, $send_time, $best_ref_mail_id, $report) = @$result_entry;
if (!$send_time) { # undef or empty (or zero)
snmp_count('PenPalsMisses');
ll(4) && do_log(4, "penpals: (redis) not found (%s,%s)%s%s",
$sid ? $sid : $r->recip_addr_smtp,
$rid ? $rid : $msginfo->sender_smtp,
!$report ? '' : ', refs: '.$report,
!@$message_id_list ? '' :
'; '.join(', ',@$message_id_list) );
} else { # found a previous related correspondence
snmp_count('PenPalsHits');
my $age = max(0, $msginfo->rx_time - $send_time);
$r->recip_penpals_age($age);
$r->recip_penpals_related($best_ref_mail_id);
ll(3) && do_log(3, "penpals: (redis) found (%s,%s) age %s%s",
$sid ? $sid : $r->recip_addr_smtp,
$rid ? $rid : $msginfo->sender_smtp,
format_time_interval($age),
!$report ? '' : ', refs: '.$report );
# $age and $best_ref_mail_id are not logged explicitly,
# as they can be seen in the first entry of a lua query report
# (i.e. the last string)
}
}
}
$ok;
}
sub save_info_preliminary {
my($self, $msginfo) = @_;
my $mail_id = $msginfo->mail_id;
defined $mail_id or die "save_info_preliminary: mail_id still undefined";
$self->connect if !$self->{connected};
ll(5) && do_log(5, 'redis: save_info_preliminary: %s, %s, ttl %s s',
$mail_id, int $msginfo->rx_time, $self->{ttl}||'x');
# use Lua to do HSETNX *and* EXPIRE atomically, otherwise we risk inserting
# a key with no expiration time if redis server goes down inbetween
my $added;
my $r = $self->{redis};
my(@args) = (1, $mail_id, int $msginfo->rx_time,
$self->{ttl} ? int $self->{ttl} : 0);
eval {
$added = $r->call('EVALSHA', $self->{lua_save_info_preliminary}, @args);
1;
} or do { # Lua function probably not cached, define again and re-try
if ($@ !~ /^NOSCRIPT/) {
$self->disconnect; chomp $@;
do_log(-1, 'save_info_preliminary, Redis Lua error: %s', $@);
} else {
$self->load_lua_programs;
$added = $r->call('EVALSHA', $self->{lua_save_info_preliminary}, @args);
}
};
$self->disconnect if !$database_sessions_persistent;
$added; # 1 if added successfully, false otherwise
}
sub query_and_update_ip_reputation {
my($self, $msginfo) = @_;
my $ip_trace_ref = $msginfo->ip_addr_trace_public;
return if !$ip_trace_ref;
my @ip_trace = unique_list($ip_trace_ref);
return if !@ip_trace;
# Irwin-Hall distribution - approximates normal distribution
# n = 4, mean = n/2, variance = n/12, sigma = sqrt(n/12) =~ 0.577
my $normal_random = (rand() + rand() + rand() + rand() - 2) / 0.577;
my(@args) = (scalar @ip_trace, map("ip:$_",@ip_trace),
sprintf("%.3f", $msginfo->rx_time),
sprintf("%.6f", $normal_random) );
my($r, $ip_stats);
eval {
$self->connect if !$self->{connected};
$r = $self->{redis};
$ip_stats = $r->call('EVALSHA', $self->{lua_query_and_update_ip}, @args);
1;
} or do { # Lua function probably not cached, define again and re-try
if ($@ !~ /^NOSCRIPT/) {
$self->disconnect; chomp $@;
do_log(-1, "query_and_update_ip_reputation, Redis Lua error: %s", $@);
} else {
$self->load_lua_programs;
$ip_stats = $r->call('EVALSHA', $self->{lua_query_and_update_ip}, @args);
}
};
my($highest_score, $worst_ip);
for my $entry (!$ip_stats ? () : @$ip_stats) {
my($ip, $n_all, $s, $h, $b, $tfirst, $tlast, $ttl) = @$entry;
$ip =~ s/^ip://s; # strip key prefix
# the current event is not yet counted nor classified
if ($n_all <= 0) {
do_log(5, "redis: IP %s ttl: %.1f h", $ip, $ttl/3600);
} else {
my $n_other = $n_all - ($s + $h + $b);
if ($n_other < 0) { $n_all = $s + $h + $b; $n_other = 0 } # just in case
my $bad_content_ratio = ($s+$b) / $n_all;
# gains strength by the number of samples, watered down by share of ham
my $score = !($s+$b) ? 0 : 0.9 * ($n_all**0.36) * exp(-6 * $h/$n_all);
my $ip_ignore;
if ($score >= 0.05) {
# it is cheaper to do a redis/lookup unconditionally,
# then ditch an ignored IP address later if necessary
my($key, $err);
($ip_ignore, $key, $err) =
lookup_ip_acl($ip, @{ca('ip_repu_ignore_maps')});
undef $ip_ignore if $err;
}
my $ll = ($score <= 0 || $ip_ignore) ? 3 : 2; # log level
if (ll($ll)) {
my $rxtime = $msginfo->rx_time;
do_log($ll, "redis: IP %s age: %s%s, ttl: %.1f h, %s, %s%s",
$ip, format_time_interval($rxtime-$tfirst),
defined $tlast ? ', last: '.format_time_interval($rxtime-$tlast) :'',
$ttl/3600,
$n_other ?
($b ? "s/h/bv/?: $s/$h/$b/$n_other" : "s/h/?: $s/$h/$n_other")
: ($b ? "s/h/bv: $s/$h/$b" : "s/h: $s/$h"),
$score <= 0 ? 'clean' : sprintf("%.0f%%, score: %.1f",
100*$bad_content_ratio, $score),
$ip_ignore ? ' =>0 ip_repu_ignore' : '');
}
$score = 0 if $ip_ignore || $score < 0.05;
if (!defined $highest_score || $score > $highest_score) {
$highest_score = $score; $worst_ip = $ip;
}
}
}
$self->disconnect if !$database_sessions_persistent;
($highest_score, $worst_ip);
}
sub save_structured_report {
my($self, $report_ref, $log_key, $queue_size_limit) = @_;
return if !$report_ref;
$self->connect if !$self->{connected};
my $r = $self->{redis};
my $report_json = Amavis::JSON::encode($report_ref); # as string of chars
# use safe_encode() instead of safe_encode_utf8() here, this way we ensure
# the resulting string of octets is always a valid UTF-8, even in case
# of a non-ASCII input string with utf8 flag off
$report_json = safe_encode('UTF-8', $report_json); # convert to octets
do_log(5, "redis: structured_report: %s %s", $log_key, $report_json);
$r->b_call("RPUSH", $log_key, $report_json);
# keep most recent - queue size limit in case noone is pulling events
$r->b_call("LTRIM", $log_key, -$queue_size_limit, -1) if $queue_size_limit;
my $res = $r->b_results; # errors will be signalled
do_log(5, "redis: save_structured_report, %d bytes, q_lim=%s, q_size=%s",
length $report_json, $queue_size_limit || 0,
$res ? join(', ',@$res) : '?') if ll(5);
1;
}
sub save_info_final {
my($self, $msginfo, $report_ref) = @_;
$self->connect if !$self->{connected};
my $r = $self->{redis};
if (c('enable_ip_repu')) {
my $rigm = ca('ip_repu_ignore_maps');
my $ip_trace_ref = $msginfo->ip_addr_trace_public;
my @ip_trace;
@ip_trace = grep { my($ignore, $key, $err) = lookup_ip_acl($_, @$rigm);
!$ignore || $err;
} unique_list($ip_trace_ref) if $ip_trace_ref;
if (@ip_trace) {
my $content =
$msginfo->is_in_contents_category(CC_VIRUS) ? 'b' :
$msginfo->is_in_contents_category(CC_BANNED) ? 'b' : undef;
if (!defined $content) { # test for ham or spam
my($min, $max);
for my $r (@{$msginfo->per_recip_data}) {
my $spam_level = $r->spam_level;
next if !defined $spam_level;
$max = $spam_level if !defined $max || $spam_level > $max;
$min = $spam_level if !defined $min || $spam_level < $min;
}
if (defined $min) {
my $ip_repu_score = $msginfo->ip_repu_score || 0; # positive or 0
# avoid self-reinforcing feedback in the IP reputation auto-learning,
# use the score without the past IP reputation contribution
if ($max - $ip_repu_score < 0.5) { $content = 'h' }
elsif ($min - $ip_repu_score >= 5) { $content = 's' }
}
}
if (!defined $content) {
# just increment the total counter
$r->b_call("HINCRBY", "ip:$_", 'n', 1) for @ip_trace;
$r->b_results;
if (ll(5)) { do_log(5,"redis: IP INCR %s", $_) for @ip_trace }
} else {
# content type is known
for (@ip_trace) {
$r->b_call("HINCRBY", "ip:$_", 'n', 1);
$r->b_call("HINCRBY", "ip:$_", $content, 1);
}
my $counts = $r->b_results;
if (ll(5) && $counts) {
do_log(5,"redis: IP INCR %s n=%d, %s=%d",
$_, shift @$counts, $content, shift @$counts) for @ip_trace;
}
}
}
}
if (!$msginfo->originating) {
# don't bother saving info on incoming messages, saves Redis storage
# while still offering necessary data for a pen pals function
$self->disconnect if !$database_sessions_persistent;
return;
}
my $mail_id = $msginfo->mail_id;
defined $mail_id or die "save_info_preliminary: mail_id still undefined";
my $sender_smtp = $msginfo->sender_smtp;
local($1); $sender_smtp =~ s/^<(.*)>\z/$1/s;
my(@recips); # only recipients which did receive a message
for my $r (@{$msginfo->per_recip_data}) {
my($dest, $resp) = ($r->recip_destiny, $r->recip_smtp_response);
next if $dest != D_PASS || ($r->recip_done && $resp !~ /^2/);
my $addr_smtp = $r->recip_addr_smtp;
next if !defined $addr_smtp;
local($1); $addr_smtp =~ s/^<(.*)>\z/$1/s;
# don't remember messages sent to self
next if lc($sender_smtp) eq lc($addr_smtp);
# don't remember problematic outgoing messages, even if delivered
next if $r->is_in_contents_category(CC_VIRUS) ||
$r->is_in_contents_category(CC_BANNED) ||
$r->is_in_contents_category(CC_SPAM) || # kill_level
$r->is_in_contents_category(CC_SPAMMY); # tag2_level
push(@recips, lc $addr_smtp);
}
my $m_id = $msginfo->get_header_field_body('message-id');
$m_id = join(' ',parse_message_id($m_id))
if defined $m_id && $m_id ne ''; # strip CFWS
my(@args) = map(defined $_ ? $_ : '', # avoid nil in a Lua table
($self->{ttl}, $msginfo->log_id,
$m_id, $msginfo->client_addr, lc $sender_smtp, @recips) );
if (!@recips) {
do_log(5,"redis: save_info_final: %s deleted", $mail_id);
} elsif (ll(5)) {
do_log(5,"redis: save_info_final: %s, passed %d of %d recips, %s",
$mail_id, scalar @recips, scalar @{$msginfo->per_recip_data},
join(', ',@args));
}
my $result;
eval {
$result = $r->call('EVALSHA', $self->{lua_save_final},
1, $mail_id, @args);
1;
} or do { # Lua function probably not cached, define again and re-try
if ($@ !~ /^NOSCRIPT/) {
$self->disconnect; undef $result; chomp $@;
do_log(-1, "save_info_final, Redis Lua error: %s", $@);
} else {
$self->load_lua_programs;
$result = $r->call('EVALSHA', $self->{lua_save_final},
1, $mail_id, @args);
}
};
my $ok = 1;
my $r_nonce = $result;
if (!defined($r_nonce) || $r_nonce ne $mail_id) {
# redis protocol falling out of step?
$ok = 0; $self->disconnect;
do_log(-1,"redis: save_info_final - nonce mismatch, expected %s, got %s",
$mail_id, defined $r_nonce ? $r_nonce : 'UNDEF');
}
# $r->call("EVAL", 'collectgarbage()', 0);
$self->disconnect if !$database_sessions_persistent;
$ok;
}
sub load_lua_programs($) {
my $self = $_[0];
do_log(5, "redis: load_lua_programs");
my $r = $self->{redis};
eval {
$self->{lua_save_info_preliminary} = $r->call('SCRIPT', 'LOAD', <<'END');
--LUA_SAVE_INFO_PRELIMINARY
local rcall, tonumber = redis.call, tonumber
local mail_id, rx_time, ttl = KEYS[1], ARGV[1], ARGV[2]
-- ensure the mail_id is unique, report false otherwise
local added = rcall("HSETNX", mail_id, "time", rx_time)
if added == 1 and ttl and tonumber(ttl) > 0 then
if rcall("EXPIRE", mail_id, ttl) ~= 1 then
return { err = "Failed to set EXPIRE on key " .. mail_id }
end
end
return added -- 1:yes, 0:no,failed
END
} or do {
$self->disconnect; die "Redis LUA error - lua_save_info_preliminary: $@\n"
};
eval {
$self->{lua_save_final} = $r->call('SCRIPT', 'LOAD', <<'END');
--LUA_SAVE_FINAL
local mail_id = KEYS[1]
local rcall = redis.call
local ARGV = ARGV
-- not delivered to any recipient, just delete the useless record
if #ARGV < 6 then
rcall("DEL", mail_id)
else
local ttl, log_id, msgid, client_addr, sender = unpack(ARGV,1,5)
local tonumber, unpack = tonumber, unpack
if not tonumber(ttl) or tonumber(ttl) <= 0 then ttl = nil end
local addresses = { [sender] = true }
-- remaining arguments 6 to #ARGV are recipient addresses
for r = 6, #ARGV do addresses[ARGV[r]] = true end
-- create mail address -> id mapping
for addr in pairs(addresses) do
local addr_key = "a:" .. addr
local addr_id
if not ttl then
addr_id = rcall("GET", addr_key)
else
-- to avoid potential race between GET and EXPIRE, set EXPIRE first
local refreshed = rcall("EXPIRE", addr_key, ttl)
if refreshed == 1 then addr_id = rcall("GET", addr_key) end
end
if not addr_id then
-- not found, assign a new id and store the email address
addr_id = rcall("INCR", "last.id.addr") -- get next id, starts at 1
addr_id = tostring(addr_id)
local ok
if ttl then
ok = rcall("SET", addr_key, addr_id, "EX", ttl, "NX")
else
ok = rcall("SET", addr_key, addr_id, "NX")
end
if not ok then
-- shouldn't happen, Lua program runs atomically, but anyway...
addr_id = rcall("GET", addr_key) -- collision, retry
end
end
addresses[addr] = addr_id
end
-- create a Message-ID -> id mapping
local msgid_key = "m:" .. msgid
local msgid_id = rcall("GET", msgid_key)
if msgid_id then -- unlikely duplicate Message-ID, but anyway...
if ttl then rcall("EXPIRE", msgid_key, ttl) end -- extend its lifetime
else
msgid_id = rcall("INCR", "last.id.msgid") -- get next id, starts at 1
msgid_id = tostring(msgid_id)
local ok
if ttl then
ok = rcall("SET", msgid_key, msgid_id, "EX", ttl, "NX")
else
ok = rcall("SET", msgid_key, msgid_id, "NX")
end
if not ok then
-- shouldn't happen, Lua program runs atomically, but anyway...
msgid_id = rcall("GET", msgid_key) -- collision, retry
end
end
-- store additional information to an existing mail_id record
local sender_id = addresses[sender]
rcall("HSET", mail_id, "log", log_id)
-- rcall("HMSET", mail_id, "log", log_id,
-- "msgid", msgid_id, "ip", client_addr, "sender", sender_id)
-- store relations: sender/msgid and sender/recipient pairs
local mapkeys = { "sm:" .. sender_id .. "::" .. msgid_id }
for r = 6, #ARGV do
local recip_id = addresses[ARGV[r]]
-- only the most recent sr:* record is kept, older are overwritten
mapkeys[#mapkeys+1] = "sr:" .. sender_id .. ":" .. recip_id
-- mapkeys[#mapkeys+1] = "srm:" .. sender_id .. ":" .. recip_id ..
-- ":" .. msgid_id
end
if not ttl then
for _,k in ipairs(mapkeys) do rcall("SET", k, mail_id) end
else
for _,k in ipairs(mapkeys) do rcall("SET", k, mail_id, "EX", ttl) end
end
end
return mail_id
END
} or do {
$self->disconnect; die "Redis LUA error - lua_save_final: $@\n"
};
eval {
$self->{lua_query_and_update_ip} = $r->call('SCRIPT', 'LOAD', <<'END');
--LUA_QUERY_AND_UPDATE_IP
local rcall, tonumber, unpack, floor, sprintf =
redis.call, tonumber, unpack, math.floor, string.format
local KEYS, ARGV = KEYS, ARGV
local rx_time, normal_random = ARGV[1], tonumber(ARGV[2])
local results = {}
for j = 1, #KEYS do
local ipkey = KEYS[j] -- an IP address, prefixed by "ip:"
local tfirst, tlast -- Unix times of creation and last access
local n, s, h, b -- counts: all, spam, ham, banned+virus
local age, ttl -- time since creation, time to live in seconds
local ip_addr_data =
rcall("HMGET", ipkey, 'tl', 'tf', 'n', 's', 'h', 'b')
if ip_addr_data then
tlast, tfirst, n, s, h, b = unpack(ip_addr_data,1,6)
end
if not tlast then -- does not exist, a new entry is needed
n = 0; tfirst = rx_time; ttl = 3*3600 -- 3 hours for new entries
rcall("HMSET", ipkey, 'tf', rx_time, 'tl', rx_time, 'n', '0')
else -- a record for this IP address exists, collect its counts and age
n = tonumber(n) or 0
local rx_time_n, tfirst_n, tlast_n =
tonumber(rx_time), tonumber(tfirst), tonumber(tlast)
if rx_time_n and tfirst_n and tlast_n then -- valid numbers
age = rx_time_n - tfirst_n -- time since entry creation
local dt = rx_time_n - tlast_n -- time since last occurrence
ttl = 3600 * (n >= 8 and 80 or (3 + n^2.2)) -- 4 to 80 hours
if ttl < 1.5 * dt then ttl = 1.5 * dt end
else -- just in case - ditch a record with invalid fields
n = 0; tfirst = rx_time; ttl = 3*3600
rcall("DEL", ipkey);
rcall("HMSET", ipkey, 'tf', rx_time, 'n', '0')
end
rcall("HMSET", ipkey, 'tl', rx_time) -- update its last-seen time
end
-- the 's', 'h', 'b' and 'n' counts will be updated later
if normal_random then
-- introduce some randomness, don't let spammers depend on a fixed ttl
ttl = ttl * (1 + normal_random * 0.2)
if ttl < 4000 then ttl = 4000 end -- no less than 1h 7min
end
-- set time-to-live in seconds, capped at 3 days, integer
if age and (age + ttl > 3*24*3600) then ttl = 3*24*3600 - age end
if ttl < 1 then
rcall("DEL", ipkey); ttl = 0
else
rcall("EXPIRE", ipkey, floor(ttl))
end
results[#results+1] = { ipkey, n or 0, s or 0, h or 0, b or 0,
tfirst or "", tlast or "", ttl }
end
return results
END
} or do {
$self->disconnect; die "Redis LUA error - lua_query_and_update_ip: $@\n"
};
eval {
$self->{lua_query_penpals} = $r->call('SCRIPT', 'LOAD', <<'END');
--LUA_QUERY_PENPALS
local tonumber, unpack, sprintf = tonumber, unpack, string.format
local rcall = redis.call
local ARGV = ARGV
local now, nonce, recipient = ARGV[1], ARGV[2], ARGV[3]
local senders_count = tonumber(ARGV[4])
local senders_argv_ofs = 5
local messageid_argv_ofs = senders_argv_ofs + senders_count + 1
local messageid_count = tonumber(ARGV[messageid_argv_ofs - 1])
local q_keys1 = {}
-- current sender as a potential previous recipient
if recipient == '' then recipient = nil end -- nothing ever sent to "<>"
if recipient then
q_keys1[#q_keys1+1] = "a:" .. recipient
end
for j = 1, senders_count do
q_keys1[#q_keys1+1] = "a:" .. ARGV[senders_argv_ofs + j - 1]
end
for j = 1, messageid_count do
q_keys1[#q_keys1+1] = "m:" .. ARGV[messageid_argv_ofs + j - 1]
end
-- map e-mail addresses and referenced Message-IDs to internal id numbers
local q_result = rcall("MGET", unpack(q_keys1))
q_keys1 = nil
local rid -- internal id of a recipient's e-mail addresses
local mids = {} -- internal ids corresponding to referenced "Message-ID"s
local senders = {}
if q_result then
local k = 0;
if recipient then -- nonempty e-mail address, i.e. not "<>"
k = k+1; rid = q_result[k]
end
for j = 1, senders_count do
k = k+1;
if not q_result[k] then senders[j] = false -- non-nil
else senders[j] = { sid = q_result[k] } end
end
for j = 1, messageid_count do
k = k+1; if q_result[k] then mids[q_result[k]] = true end
end
end
q_result = nil
-- prepare query keys to find a closest-matching previous e-mail message
-- for each sender
local q_keys2, belongs_to_sender, on_hit_txt = {}, {}, {}
for _, s in ipairs(senders) do
if s then
-- try sender/Message-ID pairs without a recipient
for m in pairs(mids) do
local nxt = #q_keys2 + 1
q_keys2[nxt] = "sm:" .. s.sid .. "::" .. m
on_hit_txt[nxt] = "mid=" .. m
belongs_to_sender[nxt] = s
end
-- try a sender/recipient pair without a Message-ID ref
if rid then
local nxt = #q_keys2 + 1
q_keys2[nxt] = "sr:" .. s.sid .. ":" .. rid
on_hit_txt[nxt] = "rid=" .. rid
belongs_to_sender[nxt] = s
end
end
end
-- get an internal id (or nil) of a matching mail_id for each query key
local q_result2
if #q_keys2 >= 1 then q_result2 = rcall("MGET", unpack(q_keys2)) end
local msginfo = {} -- data about a message mail_id (e.g. its rx_time)
if q_result2 then
for j = 1, #q_keys2 do
local rx_time_n
local mail_id = q_result2[j]
if not mail_id then
-- no matching mail_id
elseif msginfo[mail_id] then -- already looked-up
rx_time_n = msginfo[mail_id].rx_time_n
else -- not yet looked-up
msginfo[mail_id] = {}
-- see if a record for this mail_id exists, find its timestamp
rx_time_n = tonumber(rcall("HGET", mail_id, "time"))
msginfo[mail_id].rx_time_n = rx_time_n
end
if rx_time_n then -- exists and is a valid number
local s = belongs_to_sender[j]
if not s.hits then s.hits = {} end
if not s.hits[mail_id] then
s.hits[mail_id] = on_hit_txt[j]
else
s.hits[mail_id] = s.hits[mail_id] .. " " .. on_hit_txt[j]
end
-- for each sender manage a sorted list of mail_ids found
if not s.mail_id_list then
s.mail_id_list = { mail_id }
else
-- keep sender's mail_id_list sorted by rx_time, highest first
local mail_id_list = s.mail_id_list
local first_smaller_ind
for j = 1, #mail_id_list do
if msginfo[mail_id_list[j]].rx_time_n <= rx_time_n then
first_smaller_ind = j; break
end
end
table.insert(mail_id_list,
first_smaller_ind or #mail_id_list+1, mail_id)
end
end
end
end
local results = {} -- one entry for each sender, followed by a nonce
for _, s in ipairs(senders) do
if not s or not s.mail_id_list then -- no matching mail_id
results[#results+1] = { s and s.sid or "", rid }
else -- some matches for this sender, compile a report
local report = {}; local mail_id_list = s.mail_id_list
for _, mail_id in ipairs(mail_id_list) do -- first is best
report[#report+1] = sprintf("%s (%.0f s) %s", mail_id,
tonumber(now) - msginfo[mail_id].rx_time_n,
s.hits and s.hits[mail_id] or "")
end
results[#results+1] =
{ s.sid or "", rid or "", msginfo[mail_id_list[1]].rx_time_n,
mail_id_list[1], table.concat(report,", ") }
end
end
results[#results+1] = nonce
return results
END
1;
} or do {
$self->disconnect; die "Redis LUA error - lua_query_penpals: $@\n"
};
ll(5) && do_log(5, "redis: SHA fingerprints: final %s, query %s",
map(substr($_,0,10), @$self{qw(lua_save_final lua_query)}));
section_time("redis-load");
1;
}
1;