Skip to content

Commit

Permalink
Merge pull request #14356 from rockwotj/proc-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj authored Oct 24, 2023
2 parents 894cd42 + a4bdccd commit 31e94ca
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 31 deletions.
12 changes: 10 additions & 2 deletions src/v/transform/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

namespace transform {

void probe::setup_metrics(ss::sstring transform_name, probe_gauges gauges) {
void probe::setup_metrics(ss::sstring transform_name) {
wasm::transform_probe::setup_metrics(transform_name);
namespace sm = ss::metrics;

Expand Down Expand Up @@ -60,7 +60,7 @@ void probe::setup_metrics(ss::sstring transform_name, probe_gauges gauges) {
metric_defs.emplace_back(
sm::make_gauge(
"processor_state",
[s, cb = gauges.num_processors] { return cb(s); },
[this, s] { return _processor_state[s]; },
sm::description(
"The count of transform processors in a certain state"),
state_labels)
Expand All @@ -72,4 +72,12 @@ void probe::setup_metrics(ss::sstring transform_name, probe_gauges gauges) {
void probe::increment_write_bytes(uint64_t bytes) { _write_bytes += bytes; }
void probe::increment_read_bytes(uint64_t bytes) { _read_bytes += bytes; }
void probe::increment_failure() { ++_failures; }
void probe::state_change(processor_state_change change) {
if (change.from) {
_processor_state[*change.from] -= 1;
}
if (change.to) {
_processor_state[*change.to] += 1;
}
}
} // namespace transform
15 changes: 11 additions & 4 deletions src/v/transform/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,33 @@
#include "model/transform.h"
#include "wasm/probe.h"

#include <absl/container/flat_hash_map.h>

namespace transform {

struct probe_gauges {
std::function<int64_t(model::transform_report::processor::state)>
num_processors;
struct processor_state_change {
using state = model::transform_report::processor::state;

std::optional<state> from;
std::optional<state> to;
};

/** A per transform probe. */
class probe : public wasm::transform_probe {
public:
void setup_metrics(ss::sstring transform_name, probe_gauges);
void setup_metrics(ss::sstring);

void increment_read_bytes(uint64_t bytes);
void increment_write_bytes(uint64_t bytes);
void increment_failure();
void state_change(processor_state_change);

private:
uint64_t _read_bytes = 0;
uint64_t _write_bytes = 0;
uint64_t _failures = 0;
absl::flat_hash_map<model::transform_report::processor::state, uint64_t>
_processor_state;
};

} // namespace transform
69 changes: 45 additions & 24 deletions src/v/transform/transform_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

namespace transform {

using state = model::transform_report::processor::state;
using namespace std::chrono_literals;

namespace {
Expand Down Expand Up @@ -103,35 +104,60 @@ class processor_table {
std::unique_ptr<transform::processor> processor,
ss::lw_shared_ptr<transform::probe> probe)
: _processor(std::move(processor))
, _probe(std::move(probe)) {}
, _probe(std::move(probe))
, _current_state(state::inactive) {
_probe->state_change({.to = _current_state});
}
entry_t(const entry_t&) = delete;
entry_t& operator=(const entry_t&) = delete;
entry_t(entry_t&& e) noexcept
: _processor(std::move(e._processor))
, _probe(std::move(e._probe))
, _backoff(std::move(e._backoff))
, _current_state(e._current_state) {
e._current_state = std::nullopt;
}
entry_t& operator=(entry_t&& e) noexcept {
_processor = std::move(e._processor);
_probe = std::move(e._probe);
_backoff = std::move(e._backoff);
_current_state = std::exchange(e._current_state, std::nullopt);
return *this;
}
~entry_t() { _probe->state_change({.from = _current_state}); }

transform::processor* processor() const { return _processor.get(); }
const ss::lw_shared_ptr<transform::probe>& probe() const {
return _probe;
}

ClockType::duration next_backoff_duration() {
_is_errored = true;
_probe->state_change(
{.from = _current_state, .to = state::errored});
_current_state = state::errored;
return _backoff.next_backoff_duration();
}
void mark_start_attempt() {
_is_errored = false;
_probe->state_change(
{.from = _current_state, .to = state::inactive});
_current_state = state::inactive;
_backoff.mark_start_attempt();
}

model::transform_report::processor::state compute_state() const {
using state = model::transform_report::processor::state;
if (_processor->is_running()) {
return state::running;
}
return _is_errored ? state::errored : state::inactive;
void mark_running() {
_probe->state_change(
{.from = _current_state, .to = state::running});
_current_state = state::running;
}
std::optional<state> current_state() const { return _current_state; }

private:
std::unique_ptr<transform::processor> _processor;
ss::lw_shared_ptr<transform::probe> _probe;
processor_backoff<ClockType> _backoff;
bool _is_errored = false;
// This is optional so that the entry can be moved, when moved
// std::optional is made std::nullopt, we do this so that on destruction
// we don't mis-account any states in the probe.
std::optional<state> _current_state;
};

auto range() const { return std::make_pair(_table.begin(), _table.end()); }
Expand All @@ -156,16 +182,7 @@ class processor_table {
return it->second.probe();
}
auto probe = ss::make_lw_shared<transform::probe>();
probe->setup_metrics(
meta.name(),
{
.num_processors =
[this](auto state) {
return absl::c_count_if(_table, [state](const auto& entry) {
return entry.second.compute_state() == state;
});
},
});
probe->setup_metrics(meta.name());
return probe;
}

Expand Down Expand Up @@ -348,8 +365,8 @@ ss::future<> manager<ClockType>::handle_transform_error(
co_return;
}
entry->probe()->increment_failure();
co_await entry->processor()->stop();
auto delay = entry->next_backoff_duration();
co_await entry->processor()->stop();
vlog(
tlog.info,
"transform {} errored on partition {}, delaying for {} then restarting",
Expand Down Expand Up @@ -426,6 +443,7 @@ ss::future<> manager<ClockType>::create_processor(
vlog(tlog.info, "starting transform {} on {}", meta.name, ntp);
entry.mark_start_attempt();
co_await entry.processor()->start();
entry.mark_running();
}
}

Expand All @@ -449,13 +467,16 @@ model::cluster_transform_report manager<ClockType>::compute_report() const {
const auto& entry = it->second;
processor* p = entry.processor();
auto id = p->ntp().tp.partition;

auto state = entry.current_state();
if (!state) {
continue;
}
report.add(
p->id(),
p->meta(),
{
.id = id,
.status = entry.compute_state(),
.status = *state,
.node = _self,
});
}
Expand Down
5 changes: 4 additions & 1 deletion src/v/transform/transform_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ class processor {
std::unique_ptr<source>,
std::vector<std::unique_ptr<sink>>,
probe*);

processor(const processor&) = delete;
processor(processor&&) = delete;
processor& operator=(const processor&) = delete;
processor& operator=(processor&&) = delete;
virtual ~processor() = default;
virtual ss::future<> start();
virtual ss::future<> stop();
Expand Down

0 comments on commit 31e94ca

Please sign in to comment.