From d3fc2b67483ba0849de62f05f3aa14206e557b03 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Tue, 11 Jun 2024 11:26:32 -0700 Subject: [PATCH 1/3] producer_state: fix printing of request's state (cherry picked from commit 5fe079a2fedac63c4cd6fef2618746856b48eb37) --- src/v/cluster/producer_state.cc | 23 +++++++++++++++++++++++ src/v/cluster/producer_state.h | 10 ++++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/src/v/cluster/producer_state.cc b/src/v/cluster/producer_state.cc index 65bf18a61d96..b813d4d59d85 100644 --- a/src/v/cluster/producer_state.cc +++ b/src/v/cluster/producer_state.cc @@ -16,6 +16,17 @@ namespace cluster { +std::ostream& operator<<(std::ostream& os, request_state state) { + switch (state) { + case request_state::initialized: + return os << "initialized"; + case request_state::in_progress: + return os << "in_progress"; + case request_state::completed: + return os << "completed"; + } +} + result_promise_t::future_type request::result() const { return _result.get_shared_future(); } @@ -233,6 +244,18 @@ bool producer_state::operator==(const producer_state& other) const { && _requests == other._requests; } +std::ostream& operator<<(std::ostream& o, const request& request) { + fmt::print( + o, + "{{ first: {}, last: {}, term: {}, result_available: {}, state: {} }}", + request._first_sequence, + request._last_sequence, + request._term, + request._result.available(), + request._state); + return o; +} + std::ostream& operator<<(std::ostream& o, const requests& requests) { fmt::print( o, diff --git a/src/v/cluster/producer_state.h b/src/v/cluster/producer_state.h index 95cf278a4494..993d6d844b19 100644 --- a/src/v/cluster/producer_state.h +++ b/src/v/cluster/producer_state.h @@ -56,6 +56,8 @@ enum class request_state : uint8_t { completed = 2 }; +std::ostream& operator<<(std::ostream&, request_state); + /// A request for a given sequence range, both inclusive. /// The sequence numbers are stamped by the client and are a part /// of batch header. A request can either be in progress or completed @@ -77,10 +79,8 @@ class request { void set_value(ValueType&& value) { vassert( _state <= request_state::in_progress && !_result.available(), - "unexpected request state during set: state: {}, result available: " - "{}", - static_cast>(_state), - _result.available()); + "unexpected request state during result set: {}", + *this); _result.set_value(std::forward(value)); _state = request_state::completed; } @@ -90,6 +90,8 @@ class request { bool operator==(const request&) const; + friend std::ostream& operator<<(std::ostream&, const request&); + private: request_state _state{request_state::initialized}; seq_t _first_sequence; From 12f46b20658d0c08b66c0f4568dd797330e2686f Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Tue, 11 Jun 2024 12:16:32 -0700 Subject: [PATCH 2/3] producer_state: bump OOOSN to warn level these are supposed to be rare and typically indicate a misbehaving broker or client. (cherry picked from commit 7579cf1633e1b8e2e031afacec777085ffb03811) --- src/v/cluster/producer_state.cc | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/producer_state.cc b/src/v/cluster/producer_state.cc index b813d4d59d85..623bc9cbb042 100644 --- a/src/v/cluster/producer_state.cc +++ b/src/v/cluster/producer_state.cc @@ -309,8 +309,21 @@ result producer_state::try_emplace_request( current_term, reset, _requests); - return _requests.try_emplace( + + auto result = _requests.try_emplace( bid.first_seq, bid.last_seq, current_term, reset); + + if (unlikely(result.has_error())) { + vlog( + clusterlog.warn, + "[{}] error {} processing request {}, term: {}, reset: {}", + *this, + result.error(), + bid, + current_term, + reset); + } + return result; } bool producer_state::update( From a1134772c0c2322904a5b0043fac7c00d891e9ef Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Tue, 11 Jun 2024 13:04:00 -0700 Subject: [PATCH 3/3] producer_state: add request::set_error and make it idempotent A request can be marked as errored multiple times, consider the example below. replicate_f : waiting for replication term change requests from old terms gc-ed: set_err(ec) replicate_f: set_err(ec) Current assert assumes that a request can be set only once, which is true for setting a successful result but not for errors. This commit splits set_value() into set_value() and set_error() and adjusts assert conditions accordingly. (cherry picked from commit 7719140e3ad316d3b8cde523b61f7601a7389bcf) --- src/v/cluster/producer_state.cc | 29 ++++++++++++++++++++++++++--- src/v/cluster/producer_state.h | 14 ++++---------- src/v/cluster/rm_stm.cc | 13 ++++++------- 3 files changed, 36 insertions(+), 20 deletions(-) diff --git a/src/v/cluster/producer_state.cc b/src/v/cluster/producer_state.cc index 623bc9cbb042..bbcc2598528e 100644 --- a/src/v/cluster/producer_state.cc +++ b/src/v/cluster/producer_state.cc @@ -31,6 +31,29 @@ result_promise_t::future_type request::result() const { return _result.get_shared_future(); } +void request::set_value(request_result_t::value_type value) { + vassert( + _state <= request_state::in_progress && !_result.available(), + "unexpected request state during result set: {}", + *this); + _result.set_value(value); + _state = request_state::completed; +} + +void request::set_error(request_result_t::error_type error) { + // This is idempotent as different fibers can mark the result error + // at different times in some edge cases. + if (_state != request_state::completed) { + _result.set_value(error); + _state = request_state::completed; + return; + } + vassert( + _result.available() && result().get0().has_error(), + "Invalid result state, expected to be available and errored out: {}", + *this); +} + bool request::operator==(const request& other) const { bool compare = _first_sequence == other._first_sequence && _last_sequence == other._last_sequence @@ -119,7 +142,7 @@ result requests::try_emplace( // checks for sequence tracking. while (!_inflight_requests.empty()) { if (!_inflight_requests.front()->has_completed()) { - _inflight_requests.front()->set_value(errc::timeout); + _inflight_requests.front()->set_error(errc::timeout); } _inflight_requests.pop_front(); } @@ -133,7 +156,7 @@ result requests::try_emplace( if (!_inflight_requests.front()->has_completed()) { // Here we know for sure the term change, these in flight // requests are going to fail anyway, mark them so. - _inflight_requests.front()->set_value(errc::timeout); + _inflight_requests.front()->set_error(errc::timeout); } _inflight_requests.pop_front(); } @@ -214,7 +237,7 @@ bool requests::stm_apply( void requests::shutdown() { for (auto& request : _inflight_requests) { if (!request->has_completed()) { - request->_result.set_value(errc::shutting_down); + request->set_error(errc::shutting_down); } } _inflight_requests.clear(); diff --git a/src/v/cluster/producer_state.h b/src/v/cluster/producer_state.h index 993d6d844b19..d5f710c3e053 100644 --- a/src/v/cluster/producer_state.h +++ b/src/v/cluster/producer_state.h @@ -46,7 +46,8 @@ using producer_ptr = ss::lw_shared_ptr; // right after set_value(), this is an implementation quirk, be // mindful of that behavior when using it. We have a test for // it in expiring_promise_test -using result_promise_t = ss::shared_promise>; +using request_result_t = result; +using result_promise_t = ss::shared_promise; using request_ptr = ss::lw_shared_ptr; using seq_t = int32_t; @@ -75,15 +76,8 @@ class request { } } - template - void set_value(ValueType&& value) { - vassert( - _state <= request_state::in_progress && !_result.available(), - "unexpected request state during result set: {}", - *this); - _result.set_value(std::forward(value)); - _state = request_state::completed; - } + void set_value(request_result_t::value_type); + void set_error(request_result_t::error_type); void mark_request_in_progress() { _state = request_state::in_progress; } request_state state() const { return _state; } result_promise_t::future_type result() const; diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 5ff5ee9036d3..7a538389bcd7 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1065,7 +1065,7 @@ ss::future> rm_stm::do_transactional_replicate( auto expiration_it = _log_state.expiration.find(bid.pid); if (expiration_it == _log_state.expiration.end()) { vlog(_ctx_log.warn, "Can not find expiration info for pid:{}", bid.pid); - req_ptr->set_value(errc::generic_tx_error); + req_ptr->set_error(errc::generic_tx_error); co_return errc::generic_tx_error; } expiration_it->second.last_update = clock_type::now(); @@ -1079,7 +1079,7 @@ ss::future> rm_stm::do_transactional_replicate( "got {} on replicating tx data batch for pid:{}", r.error(), bid.pid); - req_ptr->set_value(r.error()); + req_ptr->set_error(r.error()); co_return r.error(); } if (!co_await wait_no_throw( @@ -1089,7 +1089,7 @@ ss::future> rm_stm::do_transactional_replicate( _ctx_log.warn, "application of the replicated tx batch has timed out pid:{}", bid.pid); - req_ptr->set_value(errc::timeout); + req_ptr->set_error(errc::timeout); co_return tx_errc::timeout; } _mem_state.estimated.erase(bid.pid); @@ -1179,7 +1179,6 @@ ss::future> rm_stm::do_idempotent_replicate( raft::replicate_options opts, ss::lw_shared_ptr> enqueued, ssx::semaphore_units& units) { - using ret_t = result; auto request = producer->try_emplace_request(bid, synced_term); if (!request) { co_return request.error(); @@ -1199,7 +1198,7 @@ ss::future> rm_stm::do_idempotent_replicate( _ctx_log.warn, "replication failed, request enqueue returned error: {}", req_enqueued.get_exception()); - req_ptr->set_value(errc::replication_error); + req_ptr->set_error(errc::replication_error); co_return errc::replication_error; } units.return_all(); @@ -1209,13 +1208,13 @@ ss::future> rm_stm::do_idempotent_replicate( if (replicated.failed()) { vlog( _ctx_log.warn, "replication failed: {}", replicated.get_exception()); - req_ptr->set_value(errc::replication_error); + req_ptr->set_error(errc::replication_error); co_return errc::replication_error; } auto result = replicated.get0(); if (result.has_error()) { vlog(_ctx_log.warn, "replication failed: {}", result.error()); - req_ptr->set_value(result.error()); + req_ptr->set_error(result.error()); co_return result.error(); } // translate to kafka offset.