diff --git a/src/v/cluster/producer_state.cc b/src/v/cluster/producer_state.cc index 65bf18a61d962..bbcc2598528ef 100644 --- a/src/v/cluster/producer_state.cc +++ b/src/v/cluster/producer_state.cc @@ -16,10 +16,44 @@ namespace cluster { +std::ostream& operator<<(std::ostream& os, request_state state) { + switch (state) { + case request_state::initialized: + return os << "initialized"; + case request_state::in_progress: + return os << "in_progress"; + case request_state::completed: + return os << "completed"; + } +} + result_promise_t::future_type request::result() const { return _result.get_shared_future(); } +void request::set_value(request_result_t::value_type value) { + vassert( + _state <= request_state::in_progress && !_result.available(), + "unexpected request state during result set: {}", + *this); + _result.set_value(value); + _state = request_state::completed; +} + +void request::set_error(request_result_t::error_type error) { + // This is idempotent as different fibers can mark the result error + // at different times in some edge cases. + if (_state != request_state::completed) { + _result.set_value(error); + _state = request_state::completed; + return; + } + vassert( + _result.available() && result().get0().has_error(), + "Invalid result state, expected to be available and errored out: {}", + *this); +} + bool request::operator==(const request& other) const { bool compare = _first_sequence == other._first_sequence && _last_sequence == other._last_sequence @@ -108,7 +142,7 @@ result requests::try_emplace( // checks for sequence tracking. while (!_inflight_requests.empty()) { if (!_inflight_requests.front()->has_completed()) { - _inflight_requests.front()->set_value(errc::timeout); + _inflight_requests.front()->set_error(errc::timeout); } _inflight_requests.pop_front(); } @@ -122,7 +156,7 @@ result requests::try_emplace( if (!_inflight_requests.front()->has_completed()) { // Here we know for sure the term change, these in flight // requests are going to fail anyway, mark them so. - _inflight_requests.front()->set_value(errc::timeout); + _inflight_requests.front()->set_error(errc::timeout); } _inflight_requests.pop_front(); } @@ -203,7 +237,7 @@ bool requests::stm_apply( void requests::shutdown() { for (auto& request : _inflight_requests) { if (!request->has_completed()) { - request->_result.set_value(errc::shutting_down); + request->set_error(errc::shutting_down); } } _inflight_requests.clear(); @@ -233,6 +267,18 @@ bool producer_state::operator==(const producer_state& other) const { && _requests == other._requests; } +std::ostream& operator<<(std::ostream& o, const request& request) { + fmt::print( + o, + "{{ first: {}, last: {}, term: {}, result_available: {}, state: {} }}", + request._first_sequence, + request._last_sequence, + request._term, + request._result.available(), + request._state); + return o; +} + std::ostream& operator<<(std::ostream& o, const requests& requests) { fmt::print( o, @@ -286,8 +332,21 @@ result producer_state::try_emplace_request( current_term, reset, _requests); - return _requests.try_emplace( + + auto result = _requests.try_emplace( bid.first_seq, bid.last_seq, current_term, reset); + + if (unlikely(result.has_error())) { + vlog( + clusterlog.warn, + "[{}] error {} processing request {}, term: {}, reset: {}", + *this, + result.error(), + bid, + current_term, + reset); + } + return result; } bool producer_state::update( diff --git a/src/v/cluster/producer_state.h b/src/v/cluster/producer_state.h index 95cf278a4494b..d5f710c3e053e 100644 --- a/src/v/cluster/producer_state.h +++ b/src/v/cluster/producer_state.h @@ -46,7 +46,8 @@ using producer_ptr = ss::lw_shared_ptr; // right after set_value(), this is an implementation quirk, be // mindful of that behavior when using it. We have a test for // it in expiring_promise_test -using result_promise_t = ss::shared_promise>; +using request_result_t = result; +using result_promise_t = ss::shared_promise; using request_ptr = ss::lw_shared_ptr; using seq_t = int32_t; @@ -56,6 +57,8 @@ enum class request_state : uint8_t { completed = 2 }; +std::ostream& operator<<(std::ostream&, request_state); + /// A request for a given sequence range, both inclusive. /// The sequence numbers are stamped by the client and are a part /// of batch header. A request can either be in progress or completed @@ -73,23 +76,16 @@ class request { } } - template - void set_value(ValueType&& value) { - vassert( - _state <= request_state::in_progress && !_result.available(), - "unexpected request state during set: state: {}, result available: " - "{}", - static_cast>(_state), - _result.available()); - _result.set_value(std::forward(value)); - _state = request_state::completed; - } + void set_value(request_result_t::value_type); + void set_error(request_result_t::error_type); void mark_request_in_progress() { _state = request_state::in_progress; } request_state state() const { return _state; } result_promise_t::future_type result() const; bool operator==(const request&) const; + friend std::ostream& operator<<(std::ostream&, const request&); + private: request_state _state{request_state::initialized}; seq_t _first_sequence; diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 5ff5ee9036d3e..7a538389bcd72 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1065,7 +1065,7 @@ ss::future> rm_stm::do_transactional_replicate( auto expiration_it = _log_state.expiration.find(bid.pid); if (expiration_it == _log_state.expiration.end()) { vlog(_ctx_log.warn, "Can not find expiration info for pid:{}", bid.pid); - req_ptr->set_value(errc::generic_tx_error); + req_ptr->set_error(errc::generic_tx_error); co_return errc::generic_tx_error; } expiration_it->second.last_update = clock_type::now(); @@ -1079,7 +1079,7 @@ ss::future> rm_stm::do_transactional_replicate( "got {} on replicating tx data batch for pid:{}", r.error(), bid.pid); - req_ptr->set_value(r.error()); + req_ptr->set_error(r.error()); co_return r.error(); } if (!co_await wait_no_throw( @@ -1089,7 +1089,7 @@ ss::future> rm_stm::do_transactional_replicate( _ctx_log.warn, "application of the replicated tx batch has timed out pid:{}", bid.pid); - req_ptr->set_value(errc::timeout); + req_ptr->set_error(errc::timeout); co_return tx_errc::timeout; } _mem_state.estimated.erase(bid.pid); @@ -1179,7 +1179,6 @@ ss::future> rm_stm::do_idempotent_replicate( raft::replicate_options opts, ss::lw_shared_ptr> enqueued, ssx::semaphore_units& units) { - using ret_t = result; auto request = producer->try_emplace_request(bid, synced_term); if (!request) { co_return request.error(); @@ -1199,7 +1198,7 @@ ss::future> rm_stm::do_idempotent_replicate( _ctx_log.warn, "replication failed, request enqueue returned error: {}", req_enqueued.get_exception()); - req_ptr->set_value(errc::replication_error); + req_ptr->set_error(errc::replication_error); co_return errc::replication_error; } units.return_all(); @@ -1209,13 +1208,13 @@ ss::future> rm_stm::do_idempotent_replicate( if (replicated.failed()) { vlog( _ctx_log.warn, "replication failed: {}", replicated.get_exception()); - req_ptr->set_value(errc::replication_error); + req_ptr->set_error(errc::replication_error); co_return errc::replication_error; } auto result = replicated.get0(); if (result.has_error()) { vlog(_ctx_log.warn, "replication failed: {}", result.error()); - req_ptr->set_value(result.error()); + req_ptr->set_error(result.error()); co_return result.error(); } // translate to kafka offset.