From e0d224b2899c7bc909bbf4c04675f677b4878568 Mon Sep 17 00:00:00 2001 From: Avital Ofstein Date: Thu, 23 Mar 2023 22:41:40 +0200 Subject: [PATCH] Support Nack functionality for AMQP implementation - avoid consumers disconnection after staying idle for a time period, due to cases when messages are not being handled (not acked). Continue of implementation of PR #3371 in Conductor Core repo. --- .../queue/amqp/AMQPObservableQueue.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/event-queue/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java b/event-queue/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java index 0f789e00b..39c4a226a 100644 --- a/event-queue/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java +++ b/event-queue/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java @@ -231,6 +231,43 @@ public void ackMsg(Message message) throws Exception { } } + @Override + public void nack(List messages) { + for (final Message message : messages) { + int retryIndex = 1; + while (true) { + try { + LOGGER.info("NACK message with delivery tag {}", message.getReceipt()); + Channel chn = + amqpConnection.getOrCreateChannel( + ConnectionType.SUBSCRIBER, + getSettings().getQueueOrExchangeName()); + chn.basicNack(Long.parseLong(message.getReceipt()), false, false); + LOGGER.info("Nack'ed the message with delivery tag {}", message.getReceipt()); + break; + } catch (final Exception e) { + AMQPRetryPattern retry = retrySettings; + if (retry == null) { + LOGGER.error( + "Cannot NACK message with delivery tag {}", + message.getReceipt(), + e); + } + try { + retry.continueOrPropogate(e, retryIndex); + } catch (Exception ex) { + LOGGER.error( + "Retries completed. Cannot NACK message with delivery tag {}", + message.getReceipt(), + e); + break; + } + retryIndex++; + } + } + } + } + private static AMQP.BasicProperties buildBasicProperties( final Message message, final AMQPSettings settings) { return new AMQP.BasicProperties.Builder()