diff --git a/src/v/cluster/members_manager.cc b/src/v/cluster/members_manager.cc index 6a8a72225fc1..8ffba26d1287 100644 --- a/src/v/cluster/members_manager.cc +++ b/src/v/cluster/members_manager.cc @@ -46,6 +46,7 @@ #include #include +#include #include namespace cluster { @@ -1509,7 +1510,6 @@ members_manager::dispatch_configuration_update(model::broker broker) { ss::future> members_manager::handle_configuration_update_request( configuration_update_request req) { - using ret_t = result; if (req.target_node != _self.id()) { vlog( clusterlog.warn, @@ -1517,7 +1517,7 @@ members_manager::handle_configuration_update_request( "configuration update.", _self, req.target_node); - return ss::make_ready_future(configuration_update_reply{false}); + co_return configuration_update_reply{false}; } vlog( clusterlog.trace, "Handling node {} configuration update", req.node.id()); @@ -1530,15 +1530,25 @@ members_manager::handle_configuration_update_request( err.value(), req.node, all_brokers); - return ss::make_ready_future(errc::invalid_configuration_update); + co_return errc::invalid_configuration_update; + } + + try { + co_await update_broker_client( + _self.id(), + _connection_cache, + req.node.id(), + req.node.rpc_address(), + _rpc_tls_config); + } catch (...) { + vlog( + clusterlog.warn, + "Unable to handle configuration update due to broker update error: " + "{}", + std::current_exception()); + co_return configuration_update_reply{false}; } - auto f = update_broker_client( - _self.id(), - _connection_cache, - req.node.id(), - req.node.rpc_address(), - _rpc_tls_config); // Current node is not the leader have to send an RPC to leader // controller std::optional leader_id = _raft0->get_leader_id(); @@ -1547,53 +1557,51 @@ members_manager::handle_configuration_update_request( clusterlog.warn, "Unable to handle configuration update, no leader controller", req.node.id()); - return ss::make_ready_future(errc::no_leader_controller); + co_return errc::no_leader_controller; } // curent node is a leader if (leader_id == _self.id()) { // Just update raft0 configuration - return update_node(std::move(req.node)).then([](std::error_code ec) { - if (ec) { - vlog( - clusterlog.warn, - "Unable to handle configuration update - {}", - ec.message()); - return ss::make_ready_future(ec); - } - return ss::make_ready_future( - configuration_update_reply{true}); - }); + std::error_code ec = co_await update_node(std::move(req.node)); + if (ec) { + vlog( + clusterlog.warn, + "Unable to handle configuration update - {}", + ec.message()); + co_return ec; + } + co_return configuration_update_reply{true}; } auto leader = _members_table.local().get_node_metadata_ref(*leader_id); if (!leader) { - return ss::make_ready_future(errc::no_leader_controller); + co_return errc::no_leader_controller; } - return with_client( - _self.id(), - _connection_cache, - *leader_id, - leader->get().broker.rpc_address(), - _rpc_tls_config, - _join_timeout, - [tout = ss::lowres_clock::now() + _join_timeout, - node = req.node, - target = *leader_id](controller_client_protocol c) mutable { - return c - .update_node_configuration( - configuration_update_request(std::move(node), target), - rpc::client_opts(tout)) - .then(&rpc::get_ctx_data); - }) - .handle_exception([](const std::exception_ptr& e) { - vlog( - clusterlog.warn, - "Error while dispatching configuration update request - {}", - e); - return ss::make_ready_future( - errc::join_request_dispatch_error); - }); + try { + co_return co_await with_client( + _self.id(), + _connection_cache, + *leader_id, + leader->get().broker.rpc_address(), + _rpc_tls_config, + _join_timeout, + [tout = ss::lowres_clock::now() + _join_timeout, + node = req.node, + target = *leader_id](controller_client_protocol c) mutable { + return c + .update_node_configuration( + configuration_update_request(std::move(node), target), + rpc::client_opts(tout)) + .then(&rpc::get_ctx_data); + }); + } catch (...) { + vlog( + clusterlog.warn, + "Error while dispatching configuration update request - {}", + std::current_exception()); + co_return errc::join_request_dispatch_error; + } } std::ostream&