-
Notifications
You must be signed in to change notification settings - Fork 593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rpc/conn_cache: eager cleanup of connection cache on shutdown #8847
Conversation
A couple of alternate approaches I prototyped but didn't like
Rather cleaning up at the cache layer seemed less complicated and easy to reason about. wdyt. |
/ci-repeat 10 |
@bharathv got a merge conflict |
413262f
to
419da08
Compare
Rebased. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of having a sharded top-level abort source that we can plumb down starting at the application layer. Curious if you have thoughts on how much or little we should extend this to other subsystems
LGTM pending CI
return ss::do_with( | ||
cache.get(node_id), | ||
[connection_timeout = connection_timeout.timeout_at(), | ||
f = std::forward<Func>(f)](auto& transport_ptr) mutable { | ||
return transport_ptr->get_connected(connection_timeout) | ||
.then([f = std::forward<Func>(f)]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious if you tried coroutinizing this as some unsharded template method, and if you did but opted for this what the snag was? the inferred signature? the convert()
calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't try coro-ing this, just playing safe, didn't want to introduce new bugs.
@@ -61,11 +61,14 @@ ss::future<> connection_cache::remove(model::node_id n) { | |||
/// \brief closes all client connections | |||
ss::future<> connection_cache::stop() { | |||
auto units = co_await _mutex.get_units(); | |||
co_await parallel_for_each(_cache, [](auto& it) { | |||
// Exchange ensures the cache is invalidated and concurrent | |||
// accesses wait on the mutex to populate new entries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we were holding the lock before so wouldn't the concurrent accesses have been waiting (or maybe the cache was checked without acquiring the lock)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or maybe the cache was checked without acquiring the lock
This .. via connection_cache::contains() & get().
auto& [_, cli] = it; | ||
return cli->stop(); | ||
}); | ||
_cache.clear(); | ||
cache.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the cache is now going to go out of scope on the next line and cleared in any case
app_signal.wait().get(); | ||
trigger_abort_source(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is an abort_source in app_signal already. can we use (or expand and use) that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did begin with that thought but I ran into an issue. Invoking this new sharded abort_source via invoke_all() returns a future but I can't block on it in app_signal's abort_source call back. To workaround it, I have to gate on it in an async fiber and that seemed unnecessarily complex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reviews, I forgot about this one, addressing comments, will shortly rebase to fix conflicts.
@@ -61,11 +61,14 @@ ss::future<> connection_cache::remove(model::node_id n) { | |||
/// \brief closes all client connections | |||
ss::future<> connection_cache::stop() { | |||
auto units = co_await _mutex.get_units(); | |||
co_await parallel_for_each(_cache, [](auto& it) { | |||
// Exchange ensures the cache is invalidated and concurrent | |||
// accesses wait on the mutex to populate new entries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or maybe the cache was checked without acquiring the lock
This .. via connection_cache::contains() & get().
return ss::do_with( | ||
cache.get(node_id), | ||
[connection_timeout = connection_timeout.timeout_at(), | ||
f = std::forward<Func>(f)](auto& transport_ptr) mutable { | ||
return transport_ptr->get_connected(connection_timeout) | ||
.then([f = std::forward<Func>(f)]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't try coro-ing this, just playing safe, didn't want to introduce new bugs.
app_signal.wait().get(); | ||
trigger_abort_source(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did begin with that thought but I ran into an issue. Invoking this new sharded abort_source via invoke_all() returns a future but I can't block on it in app_signal's abort_source call back. To workaround it, I have to gate on it in an async fiber and that seemed unnecessarily complex.
419da08
to
5b51cae
Compare
Last force-push is a rebase to fix conflicts. |
No logic changes.
Introduces a decoupled shutdown method that clears the transport map.
Abort source to be used as a trigger for shutdown the connection cache.
Currently we use a raw transport pointer at the protocol level and it is unsafe. Replace it with a shared pointer.
There is an occassional UAF due to early shutdown of cache if we do not keep the transport pointer alive until the send finishes.
5b51cae
to
6946abb
Compare
Last force-push is a rebase. |
/ci-repeat |
LGTM, the CI failures look like known flakes |
/ci-repeat |
Test failure: #9315 (unrelated, known issue). |
At shutdown, eager cleanup of connection cache entries can result in faster termination of RPCs/connections to suspended nodes. This patch includes two main changes.
Fixes #7981
Backports Required
UX Changes
Release Notes