Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrentMessageListenerContainer isChildRunning API is returning false even though active MessageListenerContainer instances are processing messages. #3338

Closed
LokeshAlamuri opened this issue Jun 29, 2024 · 11 comments
Milestone

Comments

@LokeshAlamuri
Copy link
Contributor

LokeshAlamuri commented Jun 29, 2024

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.1.3

Application

SpringBoot application uses spring-kafka module to consume and process the messages. MessageListenerContainer concurrency is 4. CommonContainerStoppingErrorHandler is chosen to stop the Container on any failures. Assume it would take 60 seconds to complete processing any message.

Describe the bug

One of the MessageListenerContainer instance throws a fatal exception. CommonContainerStoppingErrorHandler would stop the MessageListenerContainerContainer. After this point, we if try to check the MessageListenerContainer instances state immediately using following code snippet, it would show no MessageListenerContainer instances are running. Because, the state is updated prematurely as not running.

code:
org.springframework.kafka.listener.ListenerContainerRegistry listenerContainerRegistry;

listenerContainerRegistry.getListenerContainers().stream().
filter(messageListenerContainer -> messageListenerContainer.isChildRunning()).count() -- This is returning (zero)0. Ideally it should return zero only after stopping processing of messages in all the instances.

Ideally, it would take some time to properly complete processing of the messages in other MessageListenerContainer instances.

To Reproduce

Any spring-kafka consumer application

Expected behavior

org.springframework.kafka.listener.ListenerContainerRegistry listenerContainerRegistry;

listenerContainerRegistry.getListenerContainers().stream().
filter(messageListenerContainer -> messageListenerContainer.isChildRunning()).count() -- This should not return (zero)0.

The above code should return zero after receiving closed notification from all the MessageListenerContainer instances.

Problem
From the application perspective, state of the Kafka-Consumer component is reported incorrectly. This is a great problem if at all we plan to shutdown the application based on the reported state. But because of this bug, application would be stopped prematurely.

As of now, MessageListnerContainer APIs are not reporting the right status regarding when exactly all the MessageListnerContainer instances are completed processing.

Sample

https://github.com/LokeshAlamuri/SpringBootKafkaConsumerDemo/blob/main/src/main/java/com/example/springboot/ConsumerStoppedEventListener.java

Root Cause

As per my analysis, I could see that 'isChildRunning' API is returning false based on the org.springframework.kafka.listener.ConcurrentMessageListenerContainer
'running' variable. But, not based on the active MessageLisntenerContainer instances.

Solution

'isChildRunning' API should return as false by tracking the closed MessageListenerContainer instances count after it receives notification using 'childStopped' method.

@artembilan
Copy link
Member

Hi @LokeshAlamuri !

Is the problem still present after we made the fix in the PR #3347?

Thanks

Can we treat this a duplication and close respectively?

@LokeshAlamuri
Copy link
Contributor Author

Issue from the API side is not fixed. I shall make the initial changes and provide them for your review.

@artembilan
Copy link
Member

Looking to your concern again, I would say that behavior is correct.
It was always like that it is like that everywhere.
The stopped state mean that this component does not accept new requests anymore.
I our case the MessageListenerContainer does not poll Kafka broker anymore.
All those messages currently in handling by the MessageListener is out of scope of this state change.

You might better to look to something what is called "graceful shutdown": https://docs.spring.io/spring-boot/reference/web/graceful-shutdown.html#page-title.

There is ContainerProperties.setListenerTaskExecutor() for your consideration.
The default one (created on the fly) is SimpleAsyncTaskExecutor, which does interrupt threads on its close, but does not wait for their completion by default:

	/**
	 * Specify a timeout (in milliseconds) for task termination when closing
	 * this executor. The default is 0, not waiting for task termination at all.
	 * <p>Note that a concrete >0 timeout specified here will lead to the
	 * wrapping of every submitted task into a task-tracking runnable which
	 * involves considerable overhead in case of a high number of tasks.
	 * However, for a modest level of submissions with longer-running
	 * tasks, this is feasible in order to arrive at a graceful shutdown.
	 * <p>Note that {@code SimpleAsyncTaskExecutor} does not participate in
	 * a coordinated lifecycle stop but rather just awaits task termination
	 * on {@link #close()}.
	 * @param timeout the timeout in milliseconds
	 * @since 6.1
	 * @see #close()
	 * @see org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#setAwaitTerminationMillis
	 */
	public void setTaskTerminationTimeout(long timeout) {

You may consider to inject any other AsyncTaskExecutor impl which can do a graceful shutdown for you.
See more info in the ThreadPoolTaskExecutor Javadocs.

The point is that stopping a component does not mean that application does not work any more.

Therefore I'm leaning to close this as Works as Designed.
And as I said: it is like that everywhere in Spring where we implement org.springframework.context.Lifecycle contract.

@LokeshAlamuri
Copy link
Contributor Author

I agree with you. There are multiple ways to shutdown the application in a graceful manner. Even, stop() call on org.springframework.kafka.listener.ConcurrentMessageListenerContainer does make a graceful shutdown of spring-kafka message listener component. But, the API definition of isChildRunning() would give picture as if all the containers are stopped processing. These APIs could be made bit more perfect, by leveraging the childStopped() call. Please let me know, if this issue could be closed. I shall close it.

@LokeshAlamuri
Copy link
Contributor Author

LokeshAlamuri commented Jul 15, 2024

Let us keep aside the use case. One query.
If isChildRunning does return 'true' immediately after stop process is initiated, how can the framework user knows when the actual processing completes through API call(As of now we can get to know from ConcurrentContainerStoppedEvent).
I think this has to be corrected to return false if actually any message processing is happening. This reports the actual state of the spring-kafka component.
I am having an idea on how to fix this issue. I will make the complete changes and provide them for your review. Could you please evaluate this issue one more time.

@artembilan
Copy link
Member

OK.
I think you indeed are talking about a graceful shutdown.
First of all I was going to reject your request, but now I see that in Spring AMQP we do have such a logic:

	/**
	 * The time to wait for workers in milliseconds after the container is stopped. If any
	 * workers are active when the shutdown signal comes they will be allowed to finish
	 * processing as long as they can finish within this timeout. Defaults
	 * to 5 seconds.
	 * @param shutdownTimeout the shutdown timeout to set
	 */
	public void setShutdownTimeout(long shutdownTimeout) {

Which is used like:

		Runnable awaitShutdown = () -> {
			logger.info("Waiting for workers to finish.");
			try {
				boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
				if (finished) {
					logger.info("Successfully waited for workers to finish.");
				}

The special ActiveObjectCounter abstraction is implemented there in Spring AMQP, but I believe it can be achieved some other way.
I guess a default shutdownTimeout should be like 0 to avoid any breaking changes.
Since your request is a new behavior for the whole container abstraction.

Let me know if that makes sense to you and you are OK going forward for the fix!

@LokeshAlamuri
Copy link
Contributor Author

Issue is regarding isChildRunning API returning true even though actual message processing is happening. I have requested to fix this issue, since framework user is unable to find the exact state of the spring-kafka component. Ideally, any one would expect the API to return true only after the processing is completed. This can be easily identified through childStopped API.

Points what you are mentioning is bit different related to stop API in org.springframework.context.Lifecycle.

@LokeshAlamuri
Copy link
Contributor Author

Since your request is a new behavior for the whole container abstraction.

My request does not modify the existing behavior. But, provides the right state of the spring-kafka subsystem.

@LokeshAlamuri
Copy link
Contributor Author

I guess a default shutdownTimeout should be like 0 to avoid any breaking changes.
Since your request is a new behavior for the whole container abstraction.

We are already doing this in AbstractMessageListenerContainer. But the default is 10000 milliseconds. Let me know, if you want me to change this to 0.

public final void stop(boolean wait) { this.lifecycleLock.lock(); try { if (isRunning()) { if (wait) { final CountDownLatch latch = new CountDownLatch(1); doStop(latch::countDown); try { latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); // NOSONAR publishContainerStoppedEvent(); } catch (@SuppressWarnings("unused") InterruptedException e) { Thread.currentThread().interrupt(); } } else { doStop(this::publishContainerStoppedEvent); } } } finally { this.lifecycleLock.unlock(); } }

@artembilan
Copy link
Member

OK! Thank you for looking into that!

Apparently you are fully on board with the code.

So, we have everything what could give us a graceful shutdown.
Your only concern that isChildRunning() give us a false whenever it is not yet.

I will be glad to see the fix from you.

It won't make it into the release for today though.

Thank you!

@LokeshAlamuri
Copy link
Contributor Author

Could you please review the PR##3406 and give your comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants