Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gh 3340 #3347

Closed
wants to merge 17 commits into from
Closed

Gh 3340 #3347

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ See xref:kafka/thread-safety.adoc[Thread Safety].
* `ConsumerRetryAuthEvent`: published when authentication or authorization of a consumer fails and is being retried.
* `ConsumerRetryAuthSuccessfulEvent`: published when authentication or authorization has been retried successfully. Can only occur when there has been a `ConsumerRetryAuthEvent` before.
* `ContainerStoppedEvent`: published when all consumers have stopped.
* `ConcurrentContainerStoppedEvent`: published when the `ConcurrentMessageListenerContainer` has stopped.

IMPORTANT: By default, the application context's event multicaster invokes event listeners on the calling thread.
If you change the multicaster to use an async executor, you must not invoke any `Consumer` methods when the event contains a reference to the consumer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ The `AbstractConsumerSeekAware` can also now register, retrieve, and remove all
See the new APIs (`getSeekCallbacksFor(TopicPartition topicPartition)`, `getTopicsAndCallbacks()`) for more details.
For more details, see xref:kafka/seek.adoc#seek[Seek API Docs].

[x33-concurrent-container-stopped-event]]
=== ConcurrentContainerStoppedEvent
artembilan marked this conversation as resolved.
Show resolved Hide resolved

The `ConcurentContainerMessageListenerContainer` emits now a `ConcurrentContainerStoppedEvent` when all of its child containers are stopped.

[[x33-new-option-ignore-empty-batch]]
=== Configurable Handling of Empty Batches in Kafka Listener with RecordFilterStrategy

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.event;

/**
* An event published when a concurrent container is stopped.
*
* @author Lokesh Alamuri
* @since 3.3
*
*/
public class ConcurrentContainerStoppedEvent extends KafkaEvent {

private static final long serialVersionUID = 1L;

private final ConsumerStoppedEvent.Reason reason;

/**
* Construct an instance with the provided source and container.
* @param source the container instance that generated the event.
* @param reason the reason.
*/
public ConcurrentContainerStoppedEvent(Object source, ConsumerStoppedEvent.Reason reason) {
super(source, source);
this.reason = reason;
}

/**
* Return the reason why the container was stopped.
* @return the reason.
*/
public ConsumerStoppedEvent.Reason getReason() {
return this.reason;
}

@Override
public String toString() {
return "ConcurrentContainerStoppedEvent [source=" + getSource() + ", reason=" + this.reason + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.event.ConcurrentContainerStoppedEvent;
import org.springframework.kafka.event.ConsumerStoppedEvent.Reason;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -65,7 +66,7 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis

private final List<AsyncTaskExecutor> executors = new ArrayList<>();

private final AtomicInteger stoppedContainers = new AtomicInteger();
private final AtomicInteger startedContainers = new AtomicInteger();

private int concurrency = 1;

Expand Down Expand Up @@ -243,6 +244,7 @@ protected void doStart() {
+ topicPartitions.length);
this.concurrency = topicPartitions.length;
}
this.startedContainers.set(0);
setRunning(true);

for (int i = 0; i < this.concurrency; i++) {
Expand Down Expand Up @@ -374,24 +376,38 @@ protected void doStop(final Runnable callback, boolean normal) {
}
}

@Override
public void childStarted(MessageListenerContainer child) {
this.startedContainers.incrementAndGet();
}

@Override
public void childStopped(MessageListenerContainer child, Reason reason) {
if (this.reason == null || reason.equals(Reason.AUTH)) {
this.reason = reason;
}
if (Reason.AUTH.equals(this.reason)
&& getContainerProperties().isRestartAfterAuthExceptions()
&& this.concurrency == this.stoppedContainers.incrementAndGet()) {
int startedContainersCount = this.startedContainers.decrementAndGet();
if (startedContainersCount == 0) {
publishConcurrentContainerStoppedEvent(this.reason);
if (Reason.AUTH.equals(this.reason)
&& getContainerProperties().isRestartAfterAuthExceptions()) {

this.reason = null;
this.stoppedContainers.set(0);
this.reason = null;

// This has to run on another thread to avoid a deadlock on lifecycleMonitor
AsyncTaskExecutor exec = getContainerProperties().getListenerTaskExecutor();
if (exec == null) {
exec = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart");
// This has to run on another thread to avoid a deadlock on lifecycleMonitor
AsyncTaskExecutor exec = getContainerProperties().getListenerTaskExecutor();
if (exec == null) {
exec = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart");
}
exec.execute(this::start);
}
exec.execute(this::start);
}
}

private void publishConcurrentContainerStoppedEvent(Reason reason) {
ApplicationEventPublisher eventPublisher = getApplicationEventPublisher();
if (eventPublisher != null) {
eventPublisher.publishEvent(new ConcurrentContainerStoppedEvent(this, reason));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ protected void doStart() {
}
}
this.listenerConsumer = new ListenerConsumer(listener, listenerType, observationRegistry);
this.thisOrParentContainer.childStarted(this);
setRunning(true);
this.startLatch = new CountDownLatch(1);
this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,14 @@ default MessageListenerContainer getContainerFor(String topic, int partition) {
default void childStopped(MessageListenerContainer child, ConsumerStoppedEvent.Reason reason) {
}

/**
* Notify a parent container that a child container has started.
* @param child the container.
* @since 3.3
*/
default void childStarted(MessageListenerContainer child) {
}

@Override
default void destroy() {
stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.event.ConcurrentContainerStoppedEvent;
import org.springframework.kafka.event.ContainerStoppedEvent;
import org.springframework.kafka.event.KafkaEvent;
import org.springframework.kafka.support.TopicPartitionOffset;
Expand Down Expand Up @@ -157,11 +158,15 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
container.setChangeConsumerThreadName(true);
BlockingQueue<KafkaEvent> events = new LinkedBlockingQueue<>();
CountDownLatch stopLatch = new CountDownLatch(4);
CountDownLatch concurrentContainerStopLatch = new CountDownLatch(1);
container.setApplicationEventPublisher(e -> {
events.add((KafkaEvent) e);
if (e instanceof ContainerStoppedEvent) {
stopLatch.countDown();
}
if (e instanceof ConcurrentContainerStoppedEvent) {
concurrentContainerStopLatch.countDown();
}
});
CountDownLatch intercepted = new CountDownLatch(4);
container.setRecordInterceptor((record, consumer) -> {
Expand Down Expand Up @@ -205,6 +210,7 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
container.getContainers().get(0).start();
container.stop();
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(concurrentContainerStopLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(container.isInExpectedState()).isTrue();
events.forEach(e -> {
assertThat(e.getContainer(MessageListenerContainer.class)).isSameAs(container);
Expand All @@ -216,6 +222,10 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
assertThat(children).contains((KafkaMessageListenerContainer<Integer, String>) e.getSource());
}
}
else if (e instanceof ConcurrentContainerStoppedEvent) {
assertThat(e.getSource()).isSameAs(container);
assertThat(e.getContainer(MessageListenerContainer.class)).isSameAs(container);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also assert a new reason here, please?

}
else {
assertThat(children).contains((KafkaMessageListenerContainer<Integer, String>) e.getSource());
}
Expand Down