From 8d64e04b19f3c6a020140b7032819adf471de74f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=93mar=20Yasin?= Date: Mon, 12 Aug 2024 09:34:08 -0700 Subject: [PATCH 1/3] [fix][client] Create the retry producer async MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The consumer implementation is creating the retry producer synchronously, potentially unexpectedly blocking the calling thread. This changes modifies this behaviour to fit with how the dead letter queue producer is created—i.e. using the async APIs offloaded to the internal pinned executor. --- .../pulsar/client/impl/ConsumerImpl.java | 92 ++++++++++--------- 1 file changed, 48 insertions(+), 44 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 1806d13493b2f..4cf76f214f804 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -203,7 +203,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private volatile CompletableFuture> deadLetterProducer; - private volatile Producer retryLetterProducer; + private volatile CompletableFuture> retryLetterProducer; private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock(); protected volatile boolean paused; @@ -643,6 +643,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a Map customProperties, long delayTime, TimeUnit unit) { + MessageId messageId = message.getMessageId(); if (messageId == null) { return FutureUtil.failedFuture(new PulsarClientException @@ -659,29 +660,8 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a } return FutureUtil.failedFuture(exception); } - if (delayTime < 0) { - delayTime = 0; - } - if (retryLetterProducer == null) { - createProducerLock.writeLock().lock(); - try { - if (retryLetterProducer == null) { - retryLetterProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) - .topic(this.deadLetterPolicy.getRetryLetterTopic()) - .enableBatching(false) - .enableChunking(true) - .blockIfQueueFull(false) - .create(); - stats.setRetryLetterProducerStats(retryLetterProducer.getStats()); - } - } catch (Exception e) { - log.error("Create retry letter producer exception with topic: {}", - deadLetterPolicy.getRetryLetterTopic(), e); - return FutureUtil.failedFuture(e); - } finally { - createProducerLock.writeLock().unlock(); - } - } + + initRetryLetterProducerIfNeeded(); CompletableFuture result = new CompletableFuture<>(); if (retryLetterProducer != null) { try { @@ -701,7 +681,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a } propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumeTimes)); propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, - String.valueOf(unit.toMillis(delayTime))); + String.valueOf(unit.toMillis(delayTime < 0 ? 0 : delayTime))); MessageId finalMessageId = messageId; if (reconsumeTimes > this.deadLetterPolicy.getMaxRedeliverCount() @@ -732,23 +712,25 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a }); } else { assert retryMessage != null; - TypedMessageBuilder typedMessageBuilderNew = retryLetterProducer - .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) - .value(retryMessage.getData()) - .properties(propertiesMap); - if (delayTime > 0) { - typedMessageBuilderNew.deliverAfter(delayTime, unit); - } - if (message.hasKey()) { - typedMessageBuilderNew.key(message.getKey()); - } - typedMessageBuilderNew.sendAsync() - .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) - .thenAccept(v -> result.complete(null)) - .exceptionally(ex -> { - result.completeExceptionally(ex); - return null; - }); + retryLetterProducer.thenAcceptAsync(rtlProducer -> { + TypedMessageBuilder typedMessageBuilderNew = rtlProducer + .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) + .value(retryMessage.getData()) + .properties(propertiesMap); + if (delayTime > 0) { + typedMessageBuilderNew.deliverAfter(delayTime, unit); + } + if (message.hasKey()) { + typedMessageBuilderNew.key(message.getKey()); + } + typedMessageBuilderNew.sendAsync() + .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) + .thenAccept(v -> result.complete(null)) + .exceptionally(ex -> { + result.completeExceptionally(ex); + return null; + }); + }, internalPinnedExecutor); } } catch (Exception e) { result.completeExceptionally(e); @@ -757,7 +739,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a MessageId finalMessageId = messageId; result.exceptionally(ex -> { log.error("Send to retry letter topic exception with topic: {}, messageId: {}", - retryLetterProducer.getTopic(), finalMessageId, ex); + this.deadLetterPolicy.getRetryLetterTopic(), finalMessageId, ex); Set messageIds = Collections.singleton(finalMessageId); unAckedMessageTracker.remove(finalMessageId); redeliverUnacknowledgedMessages(messageIds); @@ -1136,7 +1118,7 @@ public synchronized CompletableFuture closeAsync() { ArrayList> closeFutures = new ArrayList<>(4); closeFutures.add(closeFuture); if (retryLetterProducer != null) { - closeFutures.add(retryLetterProducer.closeAsync().whenComplete((ignore, ex) -> { + closeFutures.add(retryLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { if (ex != null) { log.warn("Exception ignored in closing retryLetterProducer of consumer", ex); } @@ -2267,6 +2249,28 @@ private void initDeadLetterProducerIfNeeded() { } } + private void initRetryLetterProducerIfNeeded() { + if (retryLetterProducer == null) { + createProducerLock.writeLock().lock(); + try { + if (retryLetterProducer == null) { + retryLetterProducer = client + .newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) + .topic(this.deadLetterPolicy.getRetryLetterTopic()) + .enableBatching(false) + .enableChunking(true) + .blockIfQueueFull(false) + .createAsync(); + retryLetterProducer.thenAccept(rtlProducer -> { + stats.setDeadLetterProducerStats(rtlProducer.getStats()); + }); + } + } finally { + createProducerLock.writeLock().unlock(); + } + } + } + @Override public void seek(MessageId messageId) throws PulsarClientException { try { From f634dedd3bde9e52d447dc75e4f5dc87ca4efcbd Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 13 Aug 2024 07:44:17 -0700 Subject: [PATCH 2/3] Fix retry letter producer stats --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 4cf76f214f804..9107f4b6c1da2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2262,7 +2262,7 @@ private void initRetryLetterProducerIfNeeded() { .blockIfQueueFull(false) .createAsync(); retryLetterProducer.thenAccept(rtlProducer -> { - stats.setDeadLetterProducerStats(rtlProducer.getStats()); + stats.setRetryLetterProducerStats(rtlProducer.getStats()); }); } } finally { From 4c5f92d008bcad2c58698669b312cb901adc5cfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=93mar=20Yasin?= Date: Tue, 13 Aug 2024 14:35:00 -0700 Subject: [PATCH 3/3] Propagate failure when Retry Letter producer creation fails --- .../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 9107f4b6c1da2..3acf55afaed51 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -730,7 +730,11 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a result.completeExceptionally(ex); return null; }); - }, internalPinnedExecutor); + }, internalPinnedExecutor).exceptionally(ex -> { + result.completeExceptionally(ex); + retryLetterProducer = null; + return null; + }); } } catch (Exception e) { result.completeExceptionally(e);