Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gh 3340 #3347

Closed
wants to merge 17 commits into from
Closed

Gh 3340 #3347

Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ See xref:kafka/thread-safety.adoc[Thread Safety].
* `ConsumerRetryAuthEvent`: published when authentication or authorization of a consumer fails and is being retried.
* `ConsumerRetryAuthSuccessfulEvent`: published when authentication or authorization has been retried successfully. Can only occur when there has been a `ConsumerRetryAuthEvent` before.
* `ContainerStoppedEvent`: published when all consumers have stopped.
* `ConcurrentContainerStoppedEvent`: published when the `ConcurrentMessageListenerContainer` has stopped.

IMPORTANT: By default, the application context's event multicaster invokes event listeners on the calling thread.
If you change the multicaster to use an async executor, you must not invoke any `Consumer` methods when the event contains a reference to the consumer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ A new method, `getGroupId()`, has been added to the `ConsumerSeekCallback` inter
This method allows for more selective seek operations by targeting only the desired consumer group.
For more details, see xref:kafka/seek.adoc#seek[Seek API Docs].

[x33-concurrent-container-stopped-event]]
=== ConcurrentContainerStoppedEvent
artembilan marked this conversation as resolved.
Show resolved Hide resolved

The `ConcurentContainerMessageListenerContainer` emits now a `ConcurrentContainerStoppedEvent` when all of its child containers are stopped.

Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.event;

/**
* An event published when a concurrent container is stopped.
*
* @author Lokesh Alamuri
* @since 3.3
*
*/
public class ConcurrentContainerStoppedEvent extends KafkaEvent {

private static final long serialVersionUID = 1L;

/**
* Construct an instance with the provided source and container.
* @param source the container instance that generated the event.
* @since 3.3
artembilan marked this conversation as resolved.
Show resolved Hide resolved
*/
public ConcurrentContainerStoppedEvent(Object source) {
super(source, source);
}

@Override
public String toString() {
return "ConcurrentContainerStoppedEvent [source=" + getSource() + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.event.ConcurrentContainerStoppedEvent;
import org.springframework.kafka.event.ConsumerStoppedEvent.Reason;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -65,7 +66,7 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis

private final List<AsyncTaskExecutor> executors = new ArrayList<>();

private final AtomicInteger stoppedContainers = new AtomicInteger();
private final AtomicInteger startedContainers = new AtomicInteger();

private int concurrency = 1;

Expand Down Expand Up @@ -243,6 +244,7 @@ protected void doStart() {
+ topicPartitions.length);
this.concurrency = topicPartitions.length;
}
this.startedContainers.set(0);
setRunning(true);

for (int i = 0; i < this.concurrency; i++) {
Expand Down Expand Up @@ -374,24 +376,38 @@ protected void doStop(final Runnable callback, boolean normal) {
}
}

@Override
public void childStarted(MessageListenerContainer child) {
this.startedContainers.incrementAndGet();
}

@Override
public void childStopped(MessageListenerContainer child, Reason reason) {
if (this.reason == null || reason.equals(Reason.AUTH)) {
this.reason = reason;
}
if (Reason.AUTH.equals(this.reason)
&& getContainerProperties().isRestartAfterAuthExceptions()
&& this.concurrency == this.stoppedContainers.incrementAndGet()) {
int startedContainersCount = this.startedContainers.decrementAndGet();
if (startedContainersCount == 0) {
publishConcurrentContainerStoppedEvent();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we really should emit this event here unconditionally.
Looks like in case of Reason.AUTH we restart the container immediately.
Therefore we go to the state when children are running, so ConcurrentContainerStoppedEvent is loosing its purpose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcurrentContainer restart happens only when all the containers are stopped. Whenever all the containers are stopped, then as per the contract ConcurrentContainerStopped event has to be published.

Please give your comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but are they really stopped if we restart it immediately?
That's my concern if we should rethink this situation and don't emit that ConcurrentContainerStoppedEvent if we restart immediately.

According to your original request about stopping the whole application context on that event, such a restart might cause some problem.
So, why don't we change the logic to be sure that this is exactly a moment for ConcurrentContainerStoppedEvent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got your point. I am good to make change so that ConcurrentContainerStoppedEvent is not published when AUTH error occurred. But, I am having one concern and one suggestion.

Concern:
Whatever may be the scenario, as per the contract it is good to publish ConcurrentContainerStoppedEvent when all the containers are stopped.

Suggestion:
Error reason could be added to ConcurrentContainerStoppedEvent, so that application user would take a right decision.

public ConcurrentContainerStoppedEvent(Object source, Reason reason)

Please give your idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but are they really stopped if we restart it immediately?

Yes, childStopped API will be called only when the container stopped.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds good to have that Reason propagated down to the ConcurrentContainerStoppedEvent.
Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we propagate all reasons for each container as below. As of now, only Auth reason is propagated. Is that sufficient.

public ConcurrentContainerStoppedEvent(Object source, ConsumerStoppedEvent.Reason[] reason) -- all reasons will be propagated.

or

public ConcurrentContainerStoppedEvent(Object source, Reason reason) -- only Auth reason propagated. Other will not be propagated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we propagate only Reason.AUTH, then no reason in this reason property for the event.
I don't think we have to propagate all of them.
The logic is like this:

	if (this.reason == null || reason.equals(Reason.AUTH)) {
			this.reason = reason;
		}

So, any first reason is going to be use (or AUTH) when it overrides the value of that property.

if (Reason.AUTH.equals(this.reason)
&& getContainerProperties().isRestartAfterAuthExceptions()) {

this.reason = null;
this.stoppedContainers.set(0);
this.reason = null;

// This has to run on another thread to avoid a deadlock on lifecycleMonitor
AsyncTaskExecutor exec = getContainerProperties().getListenerTaskExecutor();
if (exec == null) {
exec = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart");
// This has to run on another thread to avoid a deadlock on lifecycleMonitor
AsyncTaskExecutor exec = getContainerProperties().getListenerTaskExecutor();
if (exec == null) {
exec = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart");
}
exec.execute(this::start);
}
exec.execute(this::start);
}
}

private void publishConcurrentContainerStoppedEvent() {
ApplicationEventPublisher eventPublisher = getApplicationEventPublisher();
if (eventPublisher != null) {
eventPublisher.publishEvent(new ConcurrentContainerStoppedEvent(this));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ protected void doStart() {
}
}
this.listenerConsumer = new ListenerConsumer(listener, listenerType, observationRegistry);
this.thisOrParentContainer.childStarted(this);
setRunning(true);
this.startLatch = new CountDownLatch(1);
this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,14 @@ default MessageListenerContainer getContainerFor(String topic, int partition) {
default void childStopped(MessageListenerContainer child, ConsumerStoppedEvent.Reason reason) {
}

/**
* Notify a parent container that a child container has started.
* @param child the container.
* @since 3.3
*/
default void childStarted(MessageListenerContainer child) {
}

@Override
default void destroy() {
stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.kafka.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
Expand All @@ -27,6 +28,7 @@
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -59,6 +61,7 @@
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.event.ConcurrentContainerStoppedEvent;
import org.springframework.kafka.event.ContainerStoppedEvent;
import org.springframework.kafka.event.KafkaEvent;
import org.springframework.kafka.support.TopicPartitionOffset;
Expand Down Expand Up @@ -142,7 +145,7 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c

final CountDownLatch latch = new CountDownLatch(3);
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
final List<String> payloads = new ArrayList<>();
final List<String> payloads = Collections.synchronizedList(new ArrayList<>());
artembilan marked this conversation as resolved.
Show resolved Hide resolved
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
listenerThreadNames.add(Thread.currentThread().getName());
Expand All @@ -157,11 +160,15 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
container.setChangeConsumerThreadName(true);
BlockingQueue<KafkaEvent> events = new LinkedBlockingQueue<>();
CountDownLatch stopLatch = new CountDownLatch(4);
CountDownLatch concurrentContainerStopLatch = new CountDownLatch(1);
container.setApplicationEventPublisher(e -> {
events.add((KafkaEvent) e);
if (e instanceof ContainerStoppedEvent) {
stopLatch.countDown();
}
if (e instanceof ConcurrentContainerStoppedEvent) {
concurrentContainerStopLatch.countDown();
}
});
CountDownLatch intercepted = new CountDownLatch(4);
container.setRecordInterceptor((record, consumer) -> {
Expand Down Expand Up @@ -205,6 +212,7 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
container.getContainers().get(0).start();
container.stop();
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(concurrentContainerStopLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(container.isInExpectedState()).isTrue();
events.forEach(e -> {
assertThat(e.getContainer(MessageListenerContainer.class)).isSameAs(container);
Expand All @@ -216,6 +224,14 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
assertThat(children).contains((KafkaMessageListenerContainer<Integer, String>) e.getSource());
}
}
else if (e instanceof ConcurrentContainerStoppedEvent) {
if (e.getSource().equals(container)) {
assertThat(e.getContainer(MessageListenerContainer.class)).isSameAs(container);
}
else {
assertThatIllegalStateException();
artembilan marked this conversation as resolved.
Show resolved Hide resolved
}
}
else {
assertThat(children).contains((KafkaMessageListenerContainer<Integer, String>) e.getSource());
}
Expand Down