Skip to content

Commit

Permalink
c/members_manager: await update_broker_client
Browse files Browse the repository at this point in the history
A new clang tidy check I'm developing caught that we're not awaiting for
the future `f` to finish. I've converted it into a coroutine and
co_awaited this future, which I assume is the intended semantics.

Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
(cherry picked from commit da3d491)
  • Loading branch information
rockwotj authored and vbotbuildovich committed Dec 19, 2023
1 parent ed23273 commit 4a0f8c3
Showing 1 changed file with 54 additions and 46 deletions.
100 changes: 54 additions & 46 deletions src/v/cluster/members_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <fmt/ranges.h>

#include <chrono>
#include <exception>
#include <system_error>
namespace cluster {

Expand Down Expand Up @@ -1509,15 +1510,14 @@ members_manager::dispatch_configuration_update(model::broker broker) {
ss::future<result<configuration_update_reply>>
members_manager::handle_configuration_update_request(
configuration_update_request req) {
using ret_t = result<configuration_update_reply>;
if (req.target_node != _self.id()) {
vlog(
clusterlog.warn,
"Current node id {} is different than requested target: {}. Ignoring "
"configuration update.",
_self,
req.target_node);
return ss::make_ready_future<ret_t>(configuration_update_reply{false});
co_return configuration_update_reply{false};
}
vlog(
clusterlog.trace, "Handling node {} configuration update", req.node.id());
Expand All @@ -1530,15 +1530,25 @@ members_manager::handle_configuration_update_request(
err.value(),
req.node,
all_brokers);
return ss::make_ready_future<ret_t>(errc::invalid_configuration_update);
co_return errc::invalid_configuration_update;
}

try {
co_await update_broker_client(
_self.id(),
_connection_cache,
req.node.id(),
req.node.rpc_address(),
_rpc_tls_config);
} catch (...) {
vlog(
clusterlog.warn,
"Unable to handle configuration update due to broker update error: "
"{}",
std::current_exception());
co_return configuration_update_reply{false};
}

auto f = update_broker_client(
_self.id(),
_connection_cache,
req.node.id(),
req.node.rpc_address(),
_rpc_tls_config);
// Current node is not the leader have to send an RPC to leader
// controller
std::optional<model::node_id> leader_id = _raft0->get_leader_id();
Expand All @@ -1547,53 +1557,51 @@ members_manager::handle_configuration_update_request(
clusterlog.warn,
"Unable to handle configuration update, no leader controller",
req.node.id());
return ss::make_ready_future<ret_t>(errc::no_leader_controller);
co_return errc::no_leader_controller;
}
// curent node is a leader
if (leader_id == _self.id()) {
// Just update raft0 configuration
return update_node(std::move(req.node)).then([](std::error_code ec) {
if (ec) {
vlog(
clusterlog.warn,
"Unable to handle configuration update - {}",
ec.message());
return ss::make_ready_future<ret_t>(ec);
}
return ss::make_ready_future<ret_t>(
configuration_update_reply{true});
});
std::error_code ec = co_await update_node(std::move(req.node));
if (ec) {
vlog(
clusterlog.warn,
"Unable to handle configuration update - {}",
ec.message());
co_return ec;
}
co_return configuration_update_reply{true};
}

auto leader = _members_table.local().get_node_metadata_ref(*leader_id);
if (!leader) {
return ss::make_ready_future<ret_t>(errc::no_leader_controller);
co_return errc::no_leader_controller;
}

return with_client<controller_client_protocol>(
_self.id(),
_connection_cache,
*leader_id,
leader->get().broker.rpc_address(),
_rpc_tls_config,
_join_timeout,
[tout = ss::lowres_clock::now() + _join_timeout,
node = req.node,
target = *leader_id](controller_client_protocol c) mutable {
return c
.update_node_configuration(
configuration_update_request(std::move(node), target),
rpc::client_opts(tout))
.then(&rpc::get_ctx_data<configuration_update_reply>);
})
.handle_exception([](const std::exception_ptr& e) {
vlog(
clusterlog.warn,
"Error while dispatching configuration update request - {}",
e);
return ss::make_ready_future<ret_t>(
errc::join_request_dispatch_error);
});
try {
co_return co_await with_client<controller_client_protocol>(
_self.id(),
_connection_cache,
*leader_id,
leader->get().broker.rpc_address(),
_rpc_tls_config,
_join_timeout,
[tout = ss::lowres_clock::now() + _join_timeout,
node = req.node,
target = *leader_id](controller_client_protocol c) mutable {
return c
.update_node_configuration(
configuration_update_request(std::move(node), target),
rpc::client_opts(tout))
.then(&rpc::get_ctx_data<configuration_update_reply>);
});
} catch (...) {
vlog(
clusterlog.warn,
"Error while dispatching configuration update request - {}",
std::current_exception());
co_return errc::join_request_dispatch_error;
}
}

std::ostream&
Expand Down

0 comments on commit 4a0f8c3

Please sign in to comment.