Skip to content

Commit

Permalink
treewide: Mark all probes as immovable
Browse files Browse the repository at this point in the history
Probes register metrics by capturing `this`, and it's not save to move
them after that. In a lot of places this is safe because their lifetime
is directly tied to a service which lives the whole program's lifetime,
but any small move of that object even during initialization can break
things (see redpanda-data#11155, redpanda-data#11095, redpanda-data#11107).

This takes a big hammer approach to removing this foot gun by making all
probes immovable and wrapping them in `std::unique_ptr`.

Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
  • Loading branch information
rockwotj committed Jun 2, 2023
1 parent f5de480 commit 785f790
Show file tree
Hide file tree
Showing 27 changed files with 154 additions and 75 deletions.
11 changes: 11 additions & 0 deletions src/v/archival/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ namespace archival {
class ntp_level_probe {
public:
ntp_level_probe(per_ntp_metrics_disabled disabled, const model::ntp& ntp);
ntp_level_probe(const ntp_level_probe&) = delete;
ntp_level_probe& operator=(const ntp_level_probe&) = delete;
ntp_level_probe(ntp_level_probe&&) = delete;
ntp_level_probe& operator=(ntp_level_probe&&) = delete;
~ntp_level_probe() = default;

void setup_ntp_metrics(const model::ntp& ntp);

Expand Down Expand Up @@ -101,6 +106,12 @@ class upload_housekeeping_probe {

public:
upload_housekeeping_probe();
upload_housekeeping_probe(const upload_housekeeping_probe&) = delete;
upload_housekeeping_probe& operator=(const upload_housekeeping_probe&)
= delete;
upload_housekeeping_probe(upload_housekeeping_probe&&) = delete;
upload_housekeeping_probe& operator=(upload_housekeeping_probe&&) = delete;
~upload_housekeeping_probe() = default;

// These metrics are updated by the service
void housekeeping_rounds(uint64_t add) { _housekeeping_rounds += add; }
Expand Down
6 changes: 6 additions & 0 deletions src/v/cloud_roles/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ class auth_refresh_probe {
void fetch_success() { ++_successful_fetches; }
void fetch_failed() { ++_fetch_errors; }

auth_refresh_probe(const auth_refresh_probe&) = delete;
auth_refresh_probe& operator=(const auth_refresh_probe&) = delete;
auth_refresh_probe(auth_refresh_probe&&) = delete;
auth_refresh_probe& operator=(auth_refresh_probe&&) = delete;
~auth_refresh_probe() = default;

private:
uint64_t _successful_fetches{0};
uint64_t _fetch_errors{0};
Expand Down
12 changes: 6 additions & 6 deletions src/v/cloud_roles/refresh_credentials.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ refresh_credentials::refresh_credentials(
, _region{std::move(region)} {}

void refresh_credentials::start() {
_probe.setup_metrics();
_probe->setup_metrics();
ssx::background = ssx::spawn_with_gate_then(
_gate, [this]() { return do_start(); });
}
Expand Down Expand Up @@ -180,7 +180,7 @@ ss::future<> refresh_credentials::fetch_and_update_credentials() {
std::move(handle_result),
[this](malformed_api_response_error err) {
_impl->increment_retries();
_probe.fetch_failed();
_probe->fetch_failed();
vlog(
clrl_log.error,
"bad api response, missing fields: {}",
Expand All @@ -189,13 +189,13 @@ ss::future<> refresh_credentials::fetch_and_update_credentials() {
},
[this](api_response_parse_error err) {
_impl->increment_retries();
_probe.fetch_failed();
_probe->fetch_failed();
vlog(clrl_log.error, "failed to parse api response: {}", err.reason);
return ss::now();
},
[this](api_request_error err) {
_impl->increment_retries();
_probe.fetch_failed();
_probe->fetch_failed();
vlog(
clrl_log.error,
"api request failed (retrying after cool-off period): {}",
Expand All @@ -204,7 +204,7 @@ ss::future<> refresh_credentials::fetch_and_update_credentials() {
},
[this](credentials creds) {
_impl->reset_retries();
_probe.fetch_success();
_probe->fetch_success();
vlog(clrl_log.info, "fetched credentials {}", creds);
return _credentials_update(std::move(creds));
});
Expand All @@ -215,7 +215,7 @@ ss::future<> refresh_credentials::stop() {
_as.request_abort();
}
co_await _gate.close();
_probe.reset();
_probe->reset();
}

std::chrono::milliseconds
Expand Down
2 changes: 1 addition & 1 deletion src/v/cloud_roles/refresh_credentials.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class refresh_credentials {
ss::abort_source& _as;
credentials_update_cb_t _credentials_update;
aws_region_name _region;
auth_refresh_probe _probe;
std::unique_ptr<auth_refresh_probe> _probe;
};

std::ostream& operator<<(std::ostream& os, const refresh_credentials& rc);
Expand Down
5 changes: 5 additions & 0 deletions src/v/cloud_storage/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ class remote_probe {
remote_metrics_disabled disabled,
remote_metrics_disabled public_disabled,
materialized_segments&);
remote_probe(const remote_probe&) = delete;
remote_probe& operator=(const remote_probe&) = delete;
remote_probe(remote_probe&&) = delete;
remote_probe& operator=(remote_probe&&) = delete;
~remote_probe() = default;

/// Register topic manifest upload
void topic_manifest_upload() { _cnt_topic_manifest_uploads++; }
Expand Down
11 changes: 11 additions & 0 deletions src/v/kafka/group_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ class group_offset_probe {
explicit group_offset_probe(model::offset& offset) noexcept
: _offset(offset)
, _public_metrics(ssx::metrics::public_metrics_handle) {}
group_offset_probe(const group_offset_probe&) = delete;
group_offset_probe& operator=(const group_offset_probe&) = delete;
group_offset_probe(group_offset_probe&&) = delete;
group_offset_probe& operator=(group_offset_probe&&) = delete;
~group_offset_probe() = default;

void setup_metrics(
const kafka::group_id& group_id, const model::topic_partition& tp) {
Expand Down Expand Up @@ -103,6 +108,12 @@ class group_probe {
, _offsets(offsets)
, _public_metrics(ssx::metrics::public_metrics_handle) {}

group_probe(const group_probe&) = delete;
group_probe& operator=(const group_probe&) = delete;
group_probe(group_probe&&) = delete;
group_probe& operator=(group_probe&&) = delete;
~group_probe() = default;

void setup_public_metrics(const kafka::group_id& group_id) {
namespace sm = ss::metrics;

Expand Down
7 changes: 7 additions & 0 deletions src/v/kafka/latency_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
namespace kafka {
class latency_probe {
public:
latency_probe() = default;
latency_probe(const latency_probe&) = delete;
latency_probe& operator=(const latency_probe&) = delete;
latency_probe(latency_probe&&) = delete;
latency_probe& operator=(latency_probe&&) = delete;
~latency_probe() = default;

void setup_metrics() {
namespace sm = ss::metrics;

Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ server::server(
if (qdc_config) {
_qdc_mon.emplace(*qdc_config);
}
_probe.setup_metrics();
_probe.setup_public_metrics();
_probe->setup_metrics();
_probe->setup_public_metrics();
}

ss::scheduling_group server::fetch_scheduling_group() const {
Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class server final : public net::server {
return _gssapi_principal_mapper;
}

latency_probe& latency_probe() { return _probe; }
latency_probe& latency_probe() { return *_probe; }

ssx::thread_worker& thread_worker() { return _thread_worker; }

Expand Down Expand Up @@ -212,7 +212,7 @@ class server final : public net::server {

handler_probe_manager _handler_probes;

class latency_probe _probe;
std::unique_ptr<class latency_probe> _probe;
ssx::thread_worker& _thread_worker;
std::unique_ptr<replica_selector> _replica_selector;
const std::unique_ptr<pandaproxy::schema_registry::api>& _schema_registry;
Expand Down
7 changes: 7 additions & 0 deletions src/v/net/client_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@
namespace net {
class client_probe {
public:
client_probe() = default;
client_probe(const client_probe&) = delete;
client_probe& operator=(const client_probe&) = delete;
client_probe(client_probe&&) = delete;
client_probe& operator=(client_probe&&) = delete;
~client_probe() = default;

void request() {
++_requests;
++_requests_pending;
Expand Down
12 changes: 6 additions & 6 deletions src/v/net/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ server::~server() = default;
void server::start() {
if (!cfg.disable_metrics) {
setup_metrics();
_probe.setup_metrics(_metrics, cfg.name.c_str());
_probe->setup_metrics(_metrics, cfg.name.c_str());
}

if (!cfg.disable_public_metrics) {
setup_public_metrics();
_probe.setup_public_metrics(_public_metrics, cfg.name.c_str());
_probe->setup_public_metrics(_public_metrics, cfg.name.c_str());
}

if (cfg.connection_rate_bindings) {
Expand All @@ -60,7 +60,7 @@ void server::start() {
.overrides
= connection_rate_bindings.value().config_overrides_rate()};

_connection_rates.emplace(std::move(info), _conn_gate, _probe);
_connection_rates.emplace(std::move(info), _conn_gate, *_probe);

connection_rate_bindings.value().config_general_rate.watch([this] {
_connection_rates->update_general_rate(
Expand Down Expand Up @@ -200,7 +200,7 @@ server::accept_finish(ss::sstring name, ss::future<ss::accept_result> f_cs_sa) {
ar.remote_address.addr());
if (!cq_units.live()) {
// Connection limit hit, drop this connection.
_probe.connection_rejected();
_probe->connection_rejected();
vlog(
_log.info,
"Connection limit reached, rejecting {}",
Expand Down Expand Up @@ -233,7 +233,7 @@ server::accept_finish(ss::sstring name, ss::future<ss::accept_result> f_cs_sa) {
"Timeout while waiting free token for connection rate. "
"addr:{}",
ar.remote_address);
_probe.timeout_waiting_rate_limit();
_probe->timeout_waiting_rate_limit();
co_return ss::stop_iteration::no;
}
}
Expand All @@ -243,7 +243,7 @@ server::accept_finish(ss::sstring name, ss::future<ss::accept_result> f_cs_sa) {
name,
std::move(ar.connection),
ar.remote_address,
_probe,
*_probe,
cfg.stream_recv_buf);
vlog(
_log.trace,
Expand Down
4 changes: 2 additions & 2 deletions src/v/net/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class server {
virtual std::string_view name() const = 0;
virtual ss::future<> apply(ss::lw_shared_ptr<net::connection>) = 0;

server_probe& probe() { return _probe; }
server_probe& probe() { return *_probe; }
ssx::semaphore& memory() { return _memory; }
ss::gate& conn_gate() { return _conn_gate; }
hdr_hist& hist() { return _hist; }
Expand Down Expand Up @@ -163,7 +163,7 @@ class server {
ss::abort_source _as;
ss::gate _conn_gate;
hdr_hist _hist;
server_probe _probe;
std::unique_ptr<server_probe> _probe;
ss::metrics::metric_groups _metrics;
ss::metrics::metric_groups _public_metrics;

Expand Down
7 changes: 7 additions & 0 deletions src/v/net/server_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ namespace net {

class server_probe {
public:
server_probe() = default;
server_probe(const server_probe&) = delete;
server_probe& operator=(const server_probe&) = delete;
server_probe(server_probe&&) = delete;
server_probe& operator=(server_probe&&) = delete;
~server_probe() = default;

void connection_established() {
++_connects;
++_connections;
Expand Down
4 changes: 2 additions & 2 deletions src/v/net/transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ ss::future<> base_transport::do_connect(clock_type::time_point timeout) {
_tls_sni_hostname ? *_tls_sni_hostname : ss::sstring{});
}
_fd = std::make_unique<ss::connected_socket>(std::move(fd));
_probe.connection_established();
_probe->connection_established();
_in = _fd->input();

// Never implicitly destroy a live output stream here: output streams
Expand All @@ -63,7 +63,7 @@ ss::future<> base_transport::do_connect(clock_type::time_point timeout) {
_out = net::batched_output_stream(_fd->output());
} catch (...) {
auto e = std::current_exception();
_probe.connection_error(e);
_probe->connection_error(e);
std::rethrow_exception(e);
}

Expand Down
2 changes: 1 addition & 1 deletion src/v/net/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class base_transport {
ss::input_stream<char> _in;
net::batched_output_stream _out;
ss::gate _dispatch_gate;
client_probe _probe;
std::unique_ptr<client_probe> _probe;

private:
ss::future<> do_connect(clock_type::time_point);
Expand Down
5 changes: 5 additions & 0 deletions src/v/pandaproxy/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ class probe {
public:
probe(
ss::httpd::path_description& path_desc, const ss::sstring& group_name);
probe(const probe&) = delete;
probe& operator=(const probe&) = delete;
probe(probe&&) = delete;
probe& operator=(probe&&) = delete;
~probe() = default;
auto auto_measure() { return _request_metrics.auto_measure(); }

private:
Expand Down
Loading

0 comments on commit 785f790

Please sign in to comment.