Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZMS-56 #363

Merged
merged 4 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions bin/check-bounce.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ process.stdin.on('data', chunk => {
});

process.stdin.on('end', () => {
let str = Buffer.concat(chunks)
.toString()
.trim();
let str = Buffer.concat(chunks).toString().trim();
let bounceInfo = bounces.check(str);
console.log('data : %s', str.replace(/\n/g, '\n' + ' '.repeat(11)));
Object.keys(bounceInfo || {}).forEach(key => {
Expand Down
7 changes: 7 additions & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ module.exports = {
},
disableInterfaces: ['forwarder'], // do not bounce messages from this interface
sendingZone: 'bounces',

// Send a warning email about delayed delivery
delayEmail: {
enabled: true,
after: 3 * 3600 * 1000 // 3h
},

zoneConfig: {
// specify zone specific bounce options
myzonename: {
Expand Down
95 changes: 95 additions & 0 deletions lib/bounces.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,101 @@ module.exports.check = (input, category) => {
};
};

module.exports.canSendBounce = (delivery, options) => {
options = options || {};
let logName = options.logName || 'Bounce';

if (delivery.skipBounce) {
log.info(
logName,
'id=%s %s.%s SKIPBOUNCE Skip bounce to %s as defined by routing',
delivery.sessionId,
delivery.id,
delivery.seq,
delivery.from || '<>'
);
return false;
}

if (/^mailer-daemon@/i.test(delivery.from) || !delivery.from) {
log.info(
logName,
'id=%s %s.%s SKIPBOUNCE Skip bounce to %s due to envelope (MAIL FROM=%s)',
delivery.sessionId,
delivery.id,
delivery.seq,
delivery.from || '<>',
JSON.stringify(delivery.from || '')
.replace(/"/g, '')
.trim() || '<>'
);
return false;
}

let xAutoResponseSuppress = delivery.headers.getFirst('X-Auto-Response-Suppress');
if (/\ball\b/i.test(xAutoResponseSuppress)) {
log.info(
logName,
'id=%s %s.%s SKIPBOUNCE Skip bounce to %s due to header (%s=%s)',
delivery.sessionId,
delivery.id,
delivery.seq,
delivery.from || '<>',
'X-Auto-Response-Suppress',
JSON.stringify(xAutoResponseSuppress).replace(/"/g, '').trim()
);
return false;
}

let autoSubmitted = delivery.headers.getFirst('Auto-Submitted');
if (/\bauto-(generated|replied)\b/i.test(autoSubmitted)) {
log.info(
logName,
'id=%s %s.%s SKIPBOUNCE Skip bounce to %s due to header (%s=%s)',
delivery.sessionId,
delivery.id,
delivery.seq,
delivery.from || '<>',
'Auto-Submitted',
JSON.stringify(autoSubmitted).replace(/"/g, '').trim()
);
return false;
}

let contentType = delivery.headers.getFirst('Content-Type');
if (/^multipart\/report\b/i.test(contentType)) {
log.info(
logName,
'id=%s %s.%s SKIPBOUNCE Skip bounce to %s due to header (%s=%s)',
delivery.sessionId,
delivery.id,
delivery.seq,
delivery.from || '<>',
'Content-Type',
'multipart/report'
);
return false;
}

if (delivery.parsedEnvelope && /^mailer-daemon@/i.test(delivery.parsedEnvelope.from)) {
log.info(
logName,
'id=%s %s.%s SKIPBOUNCE Skip bounce to %s due to header (%s=%s)',
delivery.sessionId,
delivery.id,
delivery.seq,
delivery.from || '<>',
'From',
JSON.stringify(delivery.parsedEnvelope.from || '<>')
.replace(/"/g, '')
.trim() || '<>'
);
return false;
}

return true;
};

function formatSMTPResponse(str) {
str = (str || '').toString().trim();
let code = str.match(/^\d{3}[\s-]+([\d.]+\s*)?/);
Expand Down
2 changes: 2 additions & 0 deletions lib/ip-tools.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class RedisCache {
return callback();
}

log.silly(logKey, 'DNSCACHE SET key=%s value=%s', key, JSON.stringify(value));

db.redis
.multi()
.set('dns:' + key, JSON.stringify(value))
Expand Down
96 changes: 79 additions & 17 deletions lib/mail-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ const QueueLocker = require('./queue-locker');
const TtlCache = require('./ttl-cache');
const crypto = require('crypto');
const plugins = require('./plugins');
const Headers = require('mailsplit').Headers;
const db = require('./db');
const GridFSBucket = require('mongodb').GridFSBucket;
const ObjectId = require('mongodb').ObjectId;
const internalCounters = require('./counters');
const bounces = require('./bounces');
const MailDrop = require('./mail-drop');
const yaml = require('js-yaml');
const fs = require('fs');
const pathlib = require('path');
Expand Down Expand Up @@ -47,6 +50,7 @@ class MailQueue {
this.closing = false;
this.garbageTimer = null;
this.seqIndex = new SeqIndex();
this.maildrop = new MailDrop(this);

this.cache = new TtlCache(); // shared cache for workers
this.locks = new QueueLocker();
Expand Down Expand Up @@ -745,39 +749,46 @@ class MailQueue {
*
* @param {Object} delivery Message object
* @param {Number} ttl TTL in ms. Once this time is over the message is reinserted to queue
* @param {Number} responseData SMTP response or description
* @param {Object} responseData SMTP response or description
* @param {Function} callback Run once the message is removed from active queue
*/
deferDelivery(delivery, ttl, responseData, callback) {
// add metainfo about postponing the delivery
delivery._deferred = delivery._deferred || {
first: Date.now(),
count: 0
};
delivery._deferred.count++;
delivery._deferred.last = Date.now();
delivery._deferred.next = Date.now() + ttl;
delivery._deferred.response = responseData.response;
delivery._deferred.log = responseData.log || delivery._deferred.log;
delivery.queued = new Date(Math.max(delivery._deferred.next, Date.now()));
delivery.locked = false;

const now = Date.now();

let updates = {
$set: {
_deferred: delivery._deferred,
queued: delivery.queued,
'_deferred.last': now,
'_deferred.next': now + ttl,
queued: new Date(now + ttl),
locked: false
},
$inc: {
'_deferred.count': 1
}
};

if (!delivery._deferred) {
updates.$set['_deferred.first'] = now;
}

if (responseData.response) {
updates.$set['_deferred.response'] = responseData.response;
}

if (responseData.log) {
updates.$set['_deferred.log'] = responseData.log;
}

if (responseData.updates && typeof responseData.updates === 'object') {
Object.keys(responseData.updates).forEach(key => {
if (key.charAt(0) === '$') {
if (!['$inc', '$mul'].includes(key)) {
return; // not allowed
}
// $inc etc.
updates[key] = responseData.updates[key];
updates[key] = Object.assign(updates[key] || {}, responseData.updates[key]);
return;
}
if (!updates.$set.hasOwnProperty(key)) {
Expand All @@ -787,13 +798,16 @@ class MailQueue {
}

let collection = this.mongodb.collection(this.options.collection);
collection.updateOne(
collection.findOneAndUpdate(
{
id: delivery.id,
seq: delivery.seq
},
updates,
err => {
{
returnOriginal: true
},
(err, item) => {
if (err) {
return callback(err);
}
Expand All @@ -802,6 +816,54 @@ class MailQueue {
log.verbose('Queue', '%s.%s UNLOCK (key="%s")', delivery.id, delivery.seq, delivery._lock);
this.locks.release(delivery._lock);

if (item && item.value) {
let firstCheck = item.value._deferred && item.value._deferred.first;
let prevLastCheck = item.value._deferred && item.value._deferred.last;
let lastCheck = now;

if (firstCheck && prevLastCheck) {
return this.getMeta(delivery.id, (err, meta) => {
if (err) {
// ignore
log.error('Queue', '%s.%s GET META %s', delivery.id, delivery.seq, err.message);
return callback(null, true);
}

let deliveryEntry = Object.assign(item.value, meta || {});
deliveryEntry.headers = new Headers(deliveryEntry.headers);

deliveryEntry.envelope = {
from: deliveryEntry.from,
to: deliveryEntry.recipient
};

if (!bounces.canSendBounce(deliveryEntry, { logName: 'Queue' })) {
return false;
}

return plugins.handler.runHooks(
'queue:delayed',
[
Object.assign({}, deliveryEntry, responseData),
this.maildrop,
{
first: firstCheck,
prev: prevLastCheck,
last: lastCheck
}
],
err => {
if (err) {
log.error('Queue', '%s.%s queue:delayed %s', deliveryEntry.id, deliveryEntry.seq, err.message);
}

return callback(null, true);
}
);
});
}
}

return callback(null, true);
}
);
Expand Down
6 changes: 4 additions & 2 deletions lib/queue-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class QueueServer {
);
});
}
case 'DEFER':
case 'DEFER': {
if (!client.zone) {
return client.send({
req: data.req,
Expand All @@ -190,6 +190,7 @@ class QueueServer {
deliveryStatusCounter.inc({
status: 'deferred'
});

return this.deferDelivery(client.zone, client.id, data, (err, response) => {
if (!client) {
// client already errored or closed
Expand All @@ -206,11 +207,12 @@ class QueueServer {
response
});
});
}

case 'BOUNCE':
{
bounceCounter.inc();
let bounce = data;
const bounce = data;
bounce.headers = new Headers(bounce.headers || []);
plugins.handler.runHooks(
'queue:bounce',
Expand Down
Loading