-
Notifications
You must be signed in to change notification settings - Fork 995
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shutdown ClusterTopologyRefreshTask properly #2985
base: main
Are you sure you want to change the base?
Shutdown ClusterTopologyRefreshTask properly #2985
Conversation
Delay.delay(Duration.ofMillis(1500)); | ||
assertThat(clusterClient.isTopologyRefreshInProgress()).isTrue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, not sure about this.
Topology refresh in test env is quick, and there is no guarantee that we are in right state for the test.
Most likely it will be either not started yet or already completed when assert is performed making the test flaky. Also we are adding a delay to the tests as a hole.
I have run the suggested test and it failed 10 /10 times on the assert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have any idea to reproduce the issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean how to reproduce the failing test locally or the actual issue?
For the actual issue "java.util.concurrent.RejectedExecutionException", I tried to reproduce it but could not.
I will spend some more time on it next week and see if I can come up with an approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @thachlp ,
I took a brief look and I think that this issue can be tested more easily using a unit test. There is a similar one testing client shutdown order already available. You can take a look at here
I suggest using a unit test to confirm that the pending ClusterTopologyRefreshTask is canceled/completed before shutting down the Executor group. We can inject a mock of the ClusterTopologyRefreshTask and complete it after client shutdown is initiated.
Hope it helps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @thachlp,
Thanks for giving this fix a go. I think, however, you may be on the wrong path.
Judging from the stack trace in #2904 the ClusterTopologyRefreshScheduler
attempts to refresh the topology AFTER the connections have been closed and the client is shutting down.
The suspendTopologyRefresh()
is supposed to suspend any topology refresh tasks, but it seems there is some case (race condition perhaps?) where a task is still executed during shurdown.
From the Java docs of
From my view, when we shut down RedisClusterClient, we should STOP running CANCEL scheduled tasks, that why I write STOP running tasks. Thank @tishun for explaining to me, do you have any suggestion for the fix? |
I will try to come back to you in the end of the week |
This PR introduces a check for a very specific scenario. The change doesn't necessary lead to a proper cancellation as the task itself is comprised from a series of refresh steps that are coupled through completable future's. Specifically, I think conceptually the easiest approach is to synchronize (and wait) until It would require also a bit of housekeeping, e.g. if (isEventLoopActive()) {
clientResources.eventExecutorGroup().submit(clusterTopologyRefreshTask);
return true;
} isn't atomic, |
@tishun @mp911de @thachlp |
fbb3951
to
d8507a5
Compare
Thanks @mp911de @tishun for reviews |
Hey @thachlp , The Then when the event loop is shut down it would print this message. What we need to do is:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are getting close to what we want to have.
It would be very hard to test this, but if you can think of some unit test that would also be handy. This is good to have, but not mandatory.
Thank you for spending time on this!
refreshLock.lock(); | ||
try { | ||
if (compareAndSet(false, true)) { | ||
doRun(); | ||
return; | ||
} | ||
|
||
if (logger.isDebugEnabled()) { | ||
logger.debug("ClusterTopologyRefreshTask already in progress"); | ||
} | ||
} finally { | ||
refreshLock.unlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do not need to make any changes here:
- the
comapreAndSet
makes sure there is only one thread that starts a refresh - the
suspendTopologyRefresh
should stop any new topology refresh attempts
refreshLock.lock(); | ||
try { | ||
set(false); | ||
refreshComplete.signalAll(); | ||
} finally { | ||
refreshLock.unlock(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to lock before initiating the topology refresh with reloadTopologyAsync.get()
and then unlock after we call set(false)
ReentrantLock refreshLock = topologyRefreshScheduler.getRefreshLock(); | ||
Condition refreshComplete = topologyRefreshScheduler.getRefreshComplete(); | ||
|
||
refreshLock.lock(); | ||
try { | ||
while (topologyRefreshScheduler.isTopologyRefreshInProgress()) { | ||
try { | ||
refreshComplete.await(); | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
} | ||
} finally { | ||
refreshLock.unlock(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we know there is only one thread that started a refresh we do not need to wait for more than one lock to be released. We need one lock to acquire (I'd not use a spinlock here, because if a refresh is in progress it might take some time) and then we call the super method, e.g.
refreshLock.lock();
try {
return super.shutdownAsync(quietPeriod, timeout, timeUnit);
} finally {
refreshLock.unlock();
}
Issue: #2904
Make sure that:
mvn formatter:format
target. Don’t submit any formatting related changes.