From 35fa28ba002a4511b33991099fc5d488a65a1a1e Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Tue, 23 Jul 2024 14:46:29 +0200 Subject: [PATCH] c/metrics_reporter: ensure http client is stopped before destroying In case of exceptions, destroying the client may not be safe because the underlying output stream may not be fully flushed. --- src/v/cluster/metrics_reporter.cc | 35 +++++++++++++++++-------------- src/v/cluster/metrics_reporter.h | 1 + 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/src/v/cluster/metrics_reporter.cc b/src/v/cluster/metrics_reporter.cc index 13467346639d..0a73e0a47205 100644 --- a/src/v/cluster/metrics_reporter.cc +++ b/src/v/cluster/metrics_reporter.cc @@ -398,6 +398,21 @@ ss::future metrics_reporter::make_http_client() { co_return http::client(client_configuration, _as.local()); } +ss::future<> +metrics_reporter::do_send_metrics(http::client& client, iobuf body) { + auto timeout = config::shard_local_cfg().metrics_reporter_tick_interval(); + auto res = co_await client.get_connected(timeout, _logger); + // skip sending metrics, unable to connect + if (res != http::reconnect_result_t::connected) { + vlog( + _logger.trace, "unable to send metrics report, connection timeout"); + co_return; + } + auto resp_stream = co_await client.post( + _address.path, std::move(body), http::content_type::json, timeout); + co_await resp_stream->prefetch_headers(); +} + ss::future<> metrics_reporter::do_report_metrics() { // try initializing cluster info, if it is already present this operation // does nothing. @@ -453,22 +468,10 @@ ss::future<> metrics_reporter::do_report_metrics() { } auto out = serialize_metrics_snapshot(snapshot.value()); try { - // prepare http client - auto client = co_await make_http_client(); - auto timeout - = config::shard_local_cfg().metrics_reporter_tick_interval(); - auto res = co_await client.get_connected(timeout, _logger); - // skip sending metrics, unable to connect - if (res != http::reconnect_result_t::connected) { - vlog( - _logger.trace, - "unable to send metrics report, connection timeout"); - co_return; - } - auto resp_stream = co_await client.post( - _address.path, std::move(out), http::content_type::json, timeout); - co_await resp_stream->prefetch_headers(); - co_await resp_stream->shutdown(); + co_await http::with_client( + co_await make_http_client(), [this, &out](http::client& client) { + return do_send_metrics(client, std::move(out)); + }); _last_success = ss::lowres_clock::now(); } catch (...) { vlog( diff --git a/src/v/cluster/metrics_reporter.h b/src/v/cluster/metrics_reporter.h index 326c7322db54..9343276927df 100644 --- a/src/v/cluster/metrics_reporter.h +++ b/src/v/cluster/metrics_reporter.h @@ -103,6 +103,7 @@ class metrics_reporter { ss::future> build_metrics_snapshot(); ss::future make_http_client(); + ss::future<> do_send_metrics(http::client&, iobuf body); ss::future<> try_initialize_cluster_info(); ss::future<> propagate_cluster_id();