Skip to content

Commit

Permalink
Merge pull request #11584 from ztlpn/fix-9243
Browse files Browse the repository at this point in the history
rpc/conn_cache: return errc::shutting_down after shutdown
  • Loading branch information
ztlpn authored Jun 21, 2023
2 parents cfd356e + f3640f1 commit d93e686
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/v/cluster/cluster_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ cluster::errc map_update_interruption_error_code(std::error_code ec) {
case rpc::errc::client_request_timeout:
case rpc::errc::connection_timeout:
return errc::timeout;
case rpc::errc::shutting_down:
return errc::shutting_down;
case rpc::errc::disconnected_endpoint:
case rpc::errc::exponential_backoff:
case rpc::errc::missing_node_rpc_client:
Expand Down
2 changes: 1 addition & 1 deletion src/v/redpanda/admin_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1009,8 +1009,8 @@ ss::future<> admin_server::throw_on_error(
case rpc::errc::success:
co_return;
case rpc::errc::disconnected_endpoint:
[[fallthrough]];
case rpc::errc::exponential_backoff:
case rpc::errc::shutting_down:
throw ss::httpd::base_exception(
fmt::format("Not ready: {}", ec.message()),
ss::http::reply::status_type::service_unavailable);
Expand Down
1 change: 1 addition & 0 deletions src/v/rpc/connection_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ ss::future<> connection_cache::remove_all() {
/// \brief closes all client connections
ss::future<> connection_cache::do_shutdown() {
auto units = co_await _mutex.get_units();
_shutting_down = true;
// Exchange ensures the cache is invalidated and concurrent
// accesses wait on the mutex to populate new entries.
auto cache = std::exchange(_cache, {});
Expand Down
5 changes: 5 additions & 0 deletions src/v/rpc/connection_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ class connection_cache final
shard,
[node_id, f = std::forward<Func>(f), connection_timeout](
rpc::connection_cache& cache) mutable {
if (cache._shutting_down) {
return ss::futurize<ret_t>::convert(
rpc::make_error_code(errc::shutting_down));
}
if (!cache.contains(node_id)) {
// No client available
return ss::futurize<ret_t>::convert(
Expand Down Expand Up @@ -159,6 +163,7 @@ class connection_cache final
transport_version _default_transport_version{transport_version::v2};
ss::gate _gate;
ss::optimized_optional<ss::abort_source::subscription> _as_subscription;
bool _shutting_down = false;
};
inline ss::shard_id connection_cache::shard_for(
model::node_id self,
Expand Down
3 changes: 3 additions & 0 deletions src/v/rpc/errc.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ enum class errc {
method_not_found,
version_not_supported,
connection_timeout,
shutting_down,

// Used when receiving an undefined errc (e.g. from a newer version of
// Redpanda).
Expand All @@ -49,6 +50,8 @@ struct errc_category final : public std::error_category {
return "rpc::errc::version_not_supported";
case errc::connection_timeout:
return "rpc::errc::connection_timeout";
case errc::shutting_down:
return "rpc::errc::shutting_down";
default:
return "rpc::errc::unknown(" + std::to_string(c) + ")";
}
Expand Down

0 comments on commit d93e686

Please sign in to comment.