-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix ConcurrentContainer lifecycle issues #3406
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConcurrentContainer start would be permitted only after all the containers running status is false.
Would you mind to revise the logic in a way that they are idempotent?
So, if start()
has been called before that does not mean that we cannot call it again.
Same for stop()
.
Probably if you think about these lifecycle hooks as idempotent operations, then the logic would e much simple. Or the problem will go away at all.
I also curious how this fix is correlated with your fenced one before.
At a glance they contradict each other.
Thanks
@@ -277,6 +277,10 @@ public boolean isRunning() { | |||
return this.running; | |||
} | |||
|
|||
protected boolean canStop() { | |||
return this.running; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isRunning()
not enough?
It is totally OK to have an extra logic in the overridden method in that ConcurrentMessageListenerContainer
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assume concurrency == 2.
Cmain -- concurrent container
C0, C1 -- child containers
-
start
the concurrent container.Cmain -- running
C0 -- running
C1 -- running -
stop
container C0 manually.Cmain -- not running
C0 -- not running
C1 -- running -
stop
'Cmain' container.As per the earlier condition concurrent container running status is false. So, it will not be stopped.
New condition has to be added to verify if really child containers are cleared. This is equal to
stop
is called prior or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stop container C0 manually.
Cmain -- not running
C0 -- not running
C1 -- running
But this situation is not correct.
The Cmain
has to be running until any of its children is running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But as per the API definition,
From interface:
org.springframework.context.Lifecycle Check whether this component is currently running.
In the case of a container, this will return true only if all components that apply are currently running
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
Cmain
has to be running until any of its children is running.
Cmain
ConcurrentContainer is not stopped. Only it's running status will be set to false
to indicate that one or all containers are stopped as per the API definition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with false
just for one child container that the rest are still running and we don't know about that from parent container since, as you said, it has to return false
.
If that is what you want to implement here, then it is not OK to allow to stop any child container individually.
If we allow (and I don't see why not), then false
for parent container would lead to the resource leak in the next start()
call.
Pay attention that we don't stop any running containers over there, but just create new instances.
Plus pay attention that this.startedContainers.set(0);
is wrong here, because we don't take into account those running containers at the moment.
And all of that just because we decided to return false
for the situation when at least one child container is stopped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with
false
just for one child container that the rest are still running and we don't know about that from parent container since, as you said, it has to returnfalse
.
If it returns false
, indicates one, more or all containers are stopped. We can know, what containers are running by iterating over the containers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that is what you want to implement here, then it is not OK to allow to stop any child container individually.
If we allow (and I don't see why not), thenfalse
for parent container would lead to the resource leak in the nextstart()
call.
As per the existing logic running containers are stopped, if stop
is called on ConcurrentContainer
. There is no resource leak here. Please suggest here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plus pay attention that
this.startedContainers.set(0);
is wrong here, because we don't take into account those running containers at the moment.
this.startedContainers.set(0) -- Indicates initially no child container has started. Once started, it will be incremented and gets decremented once stopped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with
false
just for one child container that the rest are still running and we don't know about that from parent container since, as you said, it has to returnfalse
.
As mentioned earlier, I am good to implement as per your comments.
set running
status to false
only if all the containers. otherwise, set to true
I will verify one more time and confirm this. I thought of putting my views. Looking for your final review of my earlier comments before I implement as you mentioned.
return true; | ||
} | ||
} | ||
if ((!isRunning() && this.startedContainers.get() > 0)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this extra logic.
The purpose of the isChildRunning()
is to report if any child container is active at the moment.
Why do we need to check !this.isRunning()
and then number of started containers?
Didn't we discuss with your in other PR (#3377 ) that it is abnormal to have child container running and stopped parent one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition in the Line:216 is the critical logic. running
status of the ConcurrentContainer will be set to false
for the following conditions.
- ConcurrentContainer itself is stopped manually.
- One of the child container is stopped for any reason.
startedContainers
count will be decremented only when the actual container exits. So, i have verified if the ConcurrentContainer
is stopped and it is having any child containers processing still messages. In this case, it should return true
.
@@ -235,7 +243,7 @@ public boolean isChildRunning() { | |||
*/ | |||
@Override | |||
protected void doStart() { | |||
if (!isRunning()) { | |||
if (!isRunning() && this.containers.stream().allMatch(container -> !container.isRunning())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This again sounds like a contradiction to what we have with fenced child container.
Why would one be running if parent is stopped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition is to prevent run
call on ConcurrentContainer before stopping the child containers. This is to verify if really all the containers are stopped.
This is similar to earlier logic. If container is in running status , subsequent run
calls will be ignored.
Earlier, running status of the concurrent container is not set to false, even if one of the container is stopped. I have made the changes to set concurrent container running status to false, even if one of the container is stopped.
In this case, condition to verify if concurrent is allowed to start needs to be changed. If all the containers running status is set to 'false' then only run
is allowed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, condition to verify if concurrent is allowed to start needs to be changed. If all the containers running status is set to 'false' then only run is allowed.
That's also not what I think about this concurrent container logic.
It is in running state when any of its child containers running.
When we start concurrent container, it suppose to start all of its children.
If some of them are running already, then idempotent.
As we state before: we just don't allow orphaned child containers to be restarted.
Probably much robust solution is to stop all the children when we call start of their parent.
This way any new start would give us a fresh state. Kinda total renew.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we start concurrent container, it suppose to start all of its children.
If some of them are running already, then idempotent.
As we state before: we just don't allow orphaned child containers to be restarted.
Probably much robust solution is to stop all the children when we call start of their parent.
This way any new start would give us a fresh state. Kinda total renew.
This is already happening. I have not modified any thing here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's also not what I think about this concurrent container logic.
It is in running state when any of its child containers running.
I have set the running
status to false
as per the API definition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still believe that we have to return
true
if any of the children are running.
I am good with this.
But one query, what is your suggestion to know from the API, if any of the container is stopped. Do we need to get all the containers and verify if any of it is stopped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or leave it as is, because we just don't change concurrent
running
state even if we stop all its children manually.
I think here we must set the running
status to false
if all the containers are stopped. Let us not leave as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, that's start()
of the concurrent container.
I think the best solution would be to stop all the currently running and go ahead with the rest of the logic where we really re-create child container.
I might agree that we can move concurrent container to not running state if all of its children are stopped.
If we have an API to notify via thisOrParentContainer
that child is stopped.
Do we need to get all the containers and verify if any of it is stopped.
Why do we need to check if they are stopped?
I think iteration over this.containers
and checking isRunning()
is enough.
Is isChildRunning()
OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, that's
start()
of the concurrent container.
I think the best solution would be to stop all the currently running and go ahead with the rest of the logic where we really re-create child container.
As per the current code,
Start API would start new child containers.
Stop API would stop any containers running.
I feel it is correct. Let us not change any thing regarding this. I have added new conditions verify if really ConcurrentContainer is really stopped or not.
As per the earlier code, second run
is ignored. APP dev can directly stop
and run
the ConcurrentContainer if he plans to run again. But, good practise to wait untill isChildRunningAPI
returns false
. This indicates all the resources are released.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to get all the containers and verify if any of it is stopped.
Why do we need to check if they are stopped? I think iteration over
this.containers
and checkingisRunning()
is enough. IsisChildRunning()
OK?
isChildRunningAPI
returns false
only when all the message processing is actually stopped. Otherwise, it will always returns true. This expected and requested behavior is really useful, since it prevents doubling the capacity requirements on the Kafka cluster as well on JVM.
App developers need to iterate over the containers to verify if any of it is stopped. As per your suggestion, ConcurrentContainer running
status is set to false
only when all the containers are stopped.
@@ -152,24 +152,22 @@ public synchronized void onApplicationEvent(ListenerContainerIdleEvent event) { | |||
} | |||
|
|||
private synchronized void stopParentAndCheckGroup(MessageListenerContainer parent) { | |||
if (parent.isRunning()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you remove this condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed the condition to verify running
is true. I have changed this, since the status of the ConcurrentContainer
would be set to false
if one of the container is stopped. Earlier, it worked since running
status of the ConcurrentContainer
is not set to false
if any of the container is stopped and it is not set to true after it is started.
|
||
@Override | ||
protected Consumer<Integer, String> createKafkaConsumer(String groupId, String clientIdPrefix, | ||
String clientIdSuffixArg, Properties properties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not look like OK code formatting?
Can you revise it, please, so it looks nice from review perspective?
let's step back and try to understand what is the problem at all. Would you mind explaining the reasoning behind your work? Thanks |
Fenced child container issue is very clear and straightforward. Assume |
I am here trying to fix the |
How does it corrupt it with its start if we don't stop ConcurrentContainer?
Right. And that's what I mean with having ConcurrentContainer running until at least one of its child container is running. |
Looks good to me. Only one issue, if we keep the But, If we have set the |
As mentioned in my previous comment, I am not trying to stop |
Right, but then it is chicken-egg problem. |
Ok then i will keep the |
Gentle reminder. I have updated the PR as per our discussion. |
...fka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java
Show resolved
Hide resolved
this.lifecycleLock.lock(); | ||
try { | ||
if (this.containers.contains(child) || this.stoppedContainers.contains(child)) { | ||
this.startedContainers.incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is somehow does not compile in my head.
Why do we need to track those stoppedContainers
at all?
Does not look like that brings any benefit over setFenced()
introduced before.
In my feeling the this.containers
collection is totally enough to track all the lifecycle of children if their start/stop
is done manually.
Well, I still see a performance benefit via this.startedContainers
counter, but this stoppedContainers
smells like an overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the container stop is called, all the this.containers
are cleared. Assume not all containers really called childstopped. Now, earlier child containers would invoke the childstopped API. How can we know that these child containers really belong to ConcurrentContainer for the active run or just stopped run. So, even though stop
is called on ConcurrentContainer, we need to maintain the previous containers untill a second run starts or all the child containers are really stopped.
this.stoppedContainers
would be useful here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. So, we must not do that this.containers.clear();
in the end of ConcurrentMessageListenerContainer.doStop(final Runnable callback, boolean normal)
.
And reset it when we do a new ConcurrentMessageListenerContainer.start()
.
This way we would always have a track of children until we go fresh start 😄
This commit would fix the following issues. 1) 'isChildRunning' API would return true only after all the containers are actually stopped. 2) ConcurrentContainer `start` would be permitted only after all the containers running status is false. 3) ConcurrentContainer `stop` would be permitted if the container is in running status or if previously `stop` API is not called. 4) Move the logic to verify whether to permit the `stop` call to KafkaMessageListenerContainer and ConcurrentMessageListenerContainer. 5) Add 'stopAbnormally' in a Lock. 6) Set the ConcurrentContainer running status to true after `childStarted` 7) Set the ConcurrentContainer running status to false after `childStopped` 8) Call `childStarted` in ConcurrentContainer from KafkaMessageListenerContainer right before publishing ConsumerStartedEvent.
This commit would fix the issue when exactly the ConcurrentContainer has to be stopped. As per the earlier logic, running status would not be set to false if any of the container is stopped. This is not correct and modified the logic to set running status to false even if one of the container is stopped. So, it is sufficient to call directly stop API on parent container that would internally check if all the containers are stopped and would execute the callback accordingly.
As per the review comments, this commit reverts the changes related to the ConcurrentContainer `running` status. Summary of all changes in this PR. 1) 'isChildRunning' API would return true only after all the containers are actually stopped. 2) Add 'stopAbnormally' in a Lock. 3) Call `childStarted` in ConcurrentContainer from KafkaMessageListenerContainer right before publishing ConsumerStartedEvent.
this.lifecycleLock.lock(); | ||
try { | ||
if (this.containers.contains(child) || this.stoppedContainers.contains(child)) { | ||
this.startedContainers.incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. So, we must not do that this.containers.clear();
in the end of ConcurrentMessageListenerContainer.doStop(final Runnable callback, boolean normal)
.
And reset it when we do a new ConcurrentMessageListenerContainer.start()
.
This way we would always have a track of children until we go fresh start 😄
This commit would change the time at which the childContainers are cleared. Earlier, childContainers are cleared during `stop` call. But, after this change childContainers would be cleared only during the next `start` call.
This commit would include the following changes. 1) Clear all the containers after all the child containers stopped. Previous commit clears only during the fresh start. 2) Publish `ConcurrentContainerStoppedEvent` when the ConcurrentContainer and all the child child containers are stopped. But, previously `ConcurrentContainerStoppedEvent` is emitted when all the containers are stopped.
I have updated the PR as per our discussion. Please review and give your comments. |
Thanks for the great contribution @LokeshAlamuri; looking forward for more! We will try to back-port the fix to those supported versions... |
OK. We cannot back-port it to previous versions. Feel free to contribute PRs for those supported versions. Hope the problem you are trying to fix is not that critical if we deal with container lifecycle properly. |
This commit would fix the issue.
childStarted
in ConcurrentContainer from KafkaMessageListenerContainer right before publishing ConsumerStartedEvent.