Skip to content

Commit

Permalink
rpc/connection_cache: protect spawned tasks with gate
Browse files Browse the repository at this point in the history
Prevents the sharded connection_cache from being stopped while there are
in-flight tasks for it.
  • Loading branch information
ballard26 committed Oct 11, 2023
1 parent 92549f6 commit 71a1fb0
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
10 changes: 6 additions & 4 deletions src/v/rpc/connection_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,12 @@ ss::future<> connection_cache::reset_client_backoff(
return ss::now();
}

return container().invoke_on(
*shard, [node_id](rpc::connection_cache& cache) mutable {
cache._cache.reset_client_backoff(node_id);
});
return ss::with_gate(_gate, [this, node_id, shard] {
return container().invoke_on(
*shard, [node_id](rpc::connection_cache& cache) mutable {
cache._cache.reset_client_backoff(node_id);
});
});
}

} // namespace rpc
31 changes: 20 additions & 11 deletions src/v/rpc/connection_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "rpc/types.h"
#include "utils/mutex.h"

#include <seastar/core/gate.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>

Expand Down Expand Up @@ -141,17 +142,25 @@ class connection_cache final
rpc::make_error_code(errc::missing_node_rpc_client));
}

return container().invoke_on(
*shard,
[node_id, f = std::forward<Func>(f), connection_timeout](
connection_cache& cache) mutable {
if (cache.is_shutting_down()) {
return ss::futurize<ret_t>::convert(
rpc::make_error_code(errc::shutting_down));
}

return cache._cache.with_node_client<Protocol, Func>(
node_id, connection_timeout, std::forward<Func>(f));
return ss::with_gate(
_gate,
[this,
node_id,
connection_timeout,
shard,
f = std::forward<Func>(f)]() mutable {
return container().invoke_on(
*shard,
[node_id, f = std::forward<Func>(f), connection_timeout](
connection_cache& cache) mutable {
if (cache.is_shutting_down()) {
return ss::futurize<ret_t>::convert(
rpc::make_error_code(errc::shutting_down));
}

return cache._cache.with_node_client<Protocol, Func>(
node_id, connection_timeout, std::forward<Func>(f));
});
});
}

Expand Down

0 comments on commit 71a1fb0

Please sign in to comment.