diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/events.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/events.adoc index 3d5f32ebac..08dad37a35 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/events.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/events.adoc @@ -23,6 +23,7 @@ See xref:kafka/thread-safety.adoc[Thread Safety]. * `ConsumerRetryAuthEvent`: published when authentication or authorization of a consumer fails and is being retried. * `ConsumerRetryAuthSuccessfulEvent`: published when authentication or authorization has been retried successfully. Can only occur when there has been a `ConsumerRetryAuthEvent` before. * `ContainerStoppedEvent`: published when all consumers have stopped. +* `ConcurrentContainerStoppedEvent`: published when the `ConcurrentMessageListenerContainer` has stopped. IMPORTANT: By default, the application context's event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, you must not invoke any `Consumer` methods when the event contains a reference to the consumer. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 2c2dc2f254..944036ba0c 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -21,6 +21,11 @@ The `AbstractConsumerSeekAware` can also now register, retrieve, and remove all See the new APIs (`getSeekCallbacksFor(TopicPartition topicPartition)`, `getTopicsAndCallbacks()`) for more details. For more details, see xref:kafka/seek.adoc#seek[Seek API Docs]. +[x33-concurrent-container-stopped-event]] +=== ConcurrentContainerStoppedEvent + +The `ConcurentContainerMessageListenerContainer` emits now a `ConcurrentContainerStoppedEvent` when all of its child containers are stopped. + [[x33-new-option-ignore-empty-batch]] === Configurable Handling of Empty Batches in Kafka Listener with RecordFilterStrategy diff --git a/spring-kafka/src/main/java/org/springframework/kafka/event/ConcurrentContainerStoppedEvent.java b/spring-kafka/src/main/java/org/springframework/kafka/event/ConcurrentContainerStoppedEvent.java new file mode 100644 index 0000000000..f07e42d193 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/event/ConcurrentContainerStoppedEvent.java @@ -0,0 +1,55 @@ +/* + * Copyright 2018-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.event; + +/** + * An event published when a concurrent container is stopped. + * + * @author Lokesh Alamuri + * @since 3.3 + * + */ +public class ConcurrentContainerStoppedEvent extends KafkaEvent { + + private static final long serialVersionUID = 1L; + + private final ConsumerStoppedEvent.Reason reason; + + /** + * Construct an instance with the provided source and container. + * @param source the container instance that generated the event. + * @param reason the reason. + */ + public ConcurrentContainerStoppedEvent(Object source, ConsumerStoppedEvent.Reason reason) { + super(source, source); + this.reason = reason; + } + + /** + * Return the reason why the container was stopped. + * @return the reason. + */ + public ConsumerStoppedEvent.Reason getReason() { + return this.reason; + } + + @Override + public String toString() { + return "ConcurrentContainerStoppedEvent [source=" + getSource() + ", reason=" + this.reason + "]"; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java index b8883ad935..4e8b7f9464 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java @@ -35,6 +35,7 @@ import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.event.ConcurrentContainerStoppedEvent; import org.springframework.kafka.event.ConsumerStoppedEvent.Reason; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.lang.Nullable; @@ -65,7 +66,7 @@ public class ConcurrentMessageListenerContainer extends AbstractMessageLis private final List executors = new ArrayList<>(); - private final AtomicInteger stoppedContainers = new AtomicInteger(); + private final AtomicInteger startedContainers = new AtomicInteger(); private int concurrency = 1; @@ -243,6 +244,7 @@ protected void doStart() { + topicPartitions.length); this.concurrency = topicPartitions.length; } + this.startedContainers.set(0); setRunning(true); for (int i = 0; i < this.concurrency; i++) { @@ -374,24 +376,38 @@ protected void doStop(final Runnable callback, boolean normal) { } } + @Override + public void childStarted(MessageListenerContainer child) { + this.startedContainers.incrementAndGet(); + } + @Override public void childStopped(MessageListenerContainer child, Reason reason) { if (this.reason == null || reason.equals(Reason.AUTH)) { this.reason = reason; } - if (Reason.AUTH.equals(this.reason) - && getContainerProperties().isRestartAfterAuthExceptions() - && this.concurrency == this.stoppedContainers.incrementAndGet()) { + int startedContainersCount = this.startedContainers.decrementAndGet(); + if (startedContainersCount == 0) { + publishConcurrentContainerStoppedEvent(this.reason); + if (Reason.AUTH.equals(this.reason) + && getContainerProperties().isRestartAfterAuthExceptions()) { - this.reason = null; - this.stoppedContainers.set(0); + this.reason = null; - // This has to run on another thread to avoid a deadlock on lifecycleMonitor - AsyncTaskExecutor exec = getContainerProperties().getListenerTaskExecutor(); - if (exec == null) { - exec = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart"); + // This has to run on another thread to avoid a deadlock on lifecycleMonitor + AsyncTaskExecutor exec = getContainerProperties().getListenerTaskExecutor(); + if (exec == null) { + exec = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart"); + } + exec.execute(this::start); } - exec.execute(this::start); + } + } + + private void publishConcurrentContainerStoppedEvent(Reason reason) { + ApplicationEventPublisher eventPublisher = getApplicationEventPublisher(); + if (eventPublisher != null) { + eventPublisher.publishEvent(new ConcurrentContainerStoppedEvent(this, reason)); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 3755c62620..94926c3ef5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -378,6 +378,7 @@ protected void doStart() { } } this.listenerConsumer = new ListenerConsumer(listener, listenerType, observationRegistry); + this.thisOrParentContainer.childStarted(this); setRunning(true); this.startLatch = new CountDownLatch(1); this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java index def96b35fe..736744e694 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java @@ -281,6 +281,14 @@ default MessageListenerContainer getContainerFor(String topic, int partition) { default void childStopped(MessageListenerContainer child, ConsumerStoppedEvent.Reason reason) { } + /** + * Notify a parent container that a child container has started. + * @param child the container. + * @since 3.3 + */ + default void childStarted(MessageListenerContainer child) { + } + @Override default void destroy() { stop(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index 3a8ba40738..938fb14736 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -59,6 +59,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.event.ConcurrentContainerStoppedEvent; import org.springframework.kafka.event.ContainerStoppedEvent; import org.springframework.kafka.event.KafkaEvent; import org.springframework.kafka.support.TopicPartitionOffset; @@ -157,11 +158,15 @@ protected Consumer createKafkaConsumer(String groupId, String c container.setChangeConsumerThreadName(true); BlockingQueue events = new LinkedBlockingQueue<>(); CountDownLatch stopLatch = new CountDownLatch(4); + CountDownLatch concurrentContainerStopLatch = new CountDownLatch(1); container.setApplicationEventPublisher(e -> { events.add((KafkaEvent) e); if (e instanceof ContainerStoppedEvent) { stopLatch.countDown(); } + if (e instanceof ConcurrentContainerStoppedEvent) { + concurrentContainerStopLatch.countDown(); + } }); CountDownLatch intercepted = new CountDownLatch(4); container.setRecordInterceptor((record, consumer) -> { @@ -205,6 +210,7 @@ protected Consumer createKafkaConsumer(String groupId, String c container.getContainers().get(0).start(); container.stop(); assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(concurrentContainerStopLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(container.isInExpectedState()).isTrue(); events.forEach(e -> { assertThat(e.getContainer(MessageListenerContainer.class)).isSameAs(container); @@ -216,6 +222,10 @@ protected Consumer createKafkaConsumer(String groupId, String c assertThat(children).contains((KafkaMessageListenerContainer) e.getSource()); } } + else if (e instanceof ConcurrentContainerStoppedEvent) { + assertThat(e.getSource()).isSameAs(container); + assertThat(e.getContainer(MessageListenerContainer.class)).isSameAs(container); + } else { assertThat(children).contains((KafkaMessageListenerContainer) e.getSource()); }