Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] producer_state: add request::set_error and make it idempotent #21524

Merged
merged 3 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 63 additions & 4 deletions src/v/cluster/producer_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,44 @@

namespace cluster {

std::ostream& operator<<(std::ostream& os, request_state state) {
switch (state) {
case request_state::initialized:
return os << "initialized";
case request_state::in_progress:
return os << "in_progress";
case request_state::completed:
return os << "completed";
}
}

result_promise_t::future_type request::result() const {
return _result.get_shared_future();
}

void request::set_value(request_result_t::value_type value) {
vassert(
_state <= request_state::in_progress && !_result.available(),
"unexpected request state during result set: {}",
*this);
_result.set_value(value);
_state = request_state::completed;
}

void request::set_error(request_result_t::error_type error) {
// This is idempotent as different fibers can mark the result error
// at different times in some edge cases.
if (_state != request_state::completed) {
_result.set_value(error);
_state = request_state::completed;
return;
}
vassert(
_result.available() && result().get0().has_error(),
"Invalid result state, expected to be available and errored out: {}",
*this);
}

bool request::operator==(const request& other) const {
bool compare = _first_sequence == other._first_sequence
&& _last_sequence == other._last_sequence
Expand Down Expand Up @@ -108,7 +142,7 @@ result<request_ptr> requests::try_emplace(
// checks for sequence tracking.
while (!_inflight_requests.empty()) {
if (!_inflight_requests.front()->has_completed()) {
_inflight_requests.front()->set_value(errc::timeout);
_inflight_requests.front()->set_error(errc::timeout);
}
_inflight_requests.pop_front();
}
Expand All @@ -122,7 +156,7 @@ result<request_ptr> requests::try_emplace(
if (!_inflight_requests.front()->has_completed()) {
// Here we know for sure the term change, these in flight
// requests are going to fail anyway, mark them so.
_inflight_requests.front()->set_value(errc::timeout);
_inflight_requests.front()->set_error(errc::timeout);
}
_inflight_requests.pop_front();
}
Expand Down Expand Up @@ -203,7 +237,7 @@ bool requests::stm_apply(
void requests::shutdown() {
for (auto& request : _inflight_requests) {
if (!request->has_completed()) {
request->_result.set_value(errc::shutting_down);
request->set_error(errc::shutting_down);
}
}
_inflight_requests.clear();
Expand Down Expand Up @@ -233,6 +267,18 @@ bool producer_state::operator==(const producer_state& other) const {
&& _requests == other._requests;
}

std::ostream& operator<<(std::ostream& o, const request& request) {
fmt::print(
o,
"{{ first: {}, last: {}, term: {}, result_available: {}, state: {} }}",
request._first_sequence,
request._last_sequence,
request._term,
request._result.available(),
request._state);
return o;
}

std::ostream& operator<<(std::ostream& o, const requests& requests) {
fmt::print(
o,
Expand Down Expand Up @@ -286,8 +332,21 @@ result<request_ptr> producer_state::try_emplace_request(
current_term,
reset,
_requests);
return _requests.try_emplace(

auto result = _requests.try_emplace(
bid.first_seq, bid.last_seq, current_term, reset);

if (unlikely(result.has_error())) {
vlog(
clusterlog.warn,
"[{}] error {} processing request {}, term: {}, reset: {}",
*this,
result.error(),
bid,
current_term,
reset);
}
return result;
}

bool producer_state::update(
Expand Down
20 changes: 8 additions & 12 deletions src/v/cluster/producer_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ using producer_ptr = ss::lw_shared_ptr<producer_state>;
// right after set_value(), this is an implementation quirk, be
// mindful of that behavior when using it. We have a test for
// it in expiring_promise_test
using result_promise_t = ss::shared_promise<result<kafka_result>>;
using request_result_t = result<kafka_result>;
using result_promise_t = ss::shared_promise<request_result_t>;
using request_ptr = ss::lw_shared_ptr<request>;
using seq_t = int32_t;

Expand All @@ -56,6 +57,8 @@ enum class request_state : uint8_t {
completed = 2
};

std::ostream& operator<<(std::ostream&, request_state);

/// A request for a given sequence range, both inclusive.
/// The sequence numbers are stamped by the client and are a part
/// of batch header. A request can either be in progress or completed
Expand All @@ -73,23 +76,16 @@ class request {
}
}

template<class ValueType>
void set_value(ValueType&& value) {
vassert(
_state <= request_state::in_progress && !_result.available(),
"unexpected request state during set: state: {}, result available: "
"{}",
static_cast<std::underlying_type_t<request_state>>(_state),
_result.available());
_result.set_value(std::forward<ValueType>(value));
_state = request_state::completed;
}
void set_value(request_result_t::value_type);
void set_error(request_result_t::error_type);
void mark_request_in_progress() { _state = request_state::in_progress; }
request_state state() const { return _state; }
result_promise_t::future_type result() const;

bool operator==(const request&) const;

friend std::ostream& operator<<(std::ostream&, const request&);

private:
request_state _state{request_state::initialized};
seq_t _first_sequence;
Expand Down
13 changes: 6 additions & 7 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
auto expiration_it = _log_state.expiration.find(bid.pid);
if (expiration_it == _log_state.expiration.end()) {
vlog(_ctx_log.warn, "Can not find expiration info for pid:{}", bid.pid);
req_ptr->set_value(errc::generic_tx_error);
req_ptr->set_error(errc::generic_tx_error);
co_return errc::generic_tx_error;
}
expiration_it->second.last_update = clock_type::now();
Expand All @@ -1079,7 +1079,7 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
"got {} on replicating tx data batch for pid:{}",
r.error(),
bid.pid);
req_ptr->set_value(r.error());
req_ptr->set_error(r.error());
co_return r.error();
}
if (!co_await wait_no_throw(
Expand All @@ -1089,7 +1089,7 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
_ctx_log.warn,
"application of the replicated tx batch has timed out pid:{}",
bid.pid);
req_ptr->set_value(errc::timeout);
req_ptr->set_error(errc::timeout);
co_return tx_errc::timeout;
}
_mem_state.estimated.erase(bid.pid);
Expand Down Expand Up @@ -1179,7 +1179,6 @@ ss::future<result<kafka_result>> rm_stm::do_idempotent_replicate(
raft::replicate_options opts,
ss::lw_shared_ptr<available_promise<>> enqueued,
ssx::semaphore_units& units) {
using ret_t = result<kafka_result>;
auto request = producer->try_emplace_request(bid, synced_term);
if (!request) {
co_return request.error();
Expand All @@ -1199,7 +1198,7 @@ ss::future<result<kafka_result>> rm_stm::do_idempotent_replicate(
_ctx_log.warn,
"replication failed, request enqueue returned error: {}",
req_enqueued.get_exception());
req_ptr->set_value<ret_t>(errc::replication_error);
req_ptr->set_error(errc::replication_error);
co_return errc::replication_error;
}
units.return_all();
Expand All @@ -1209,13 +1208,13 @@ ss::future<result<kafka_result>> rm_stm::do_idempotent_replicate(
if (replicated.failed()) {
vlog(
_ctx_log.warn, "replication failed: {}", replicated.get_exception());
req_ptr->set_value<ret_t>(errc::replication_error);
req_ptr->set_error(errc::replication_error);
co_return errc::replication_error;
}
auto result = replicated.get0();
if (result.has_error()) {
vlog(_ctx_log.warn, "replication failed: {}", result.error());
req_ptr->set_value<ret_t>(result.error());
req_ptr->set_error(result.error());
co_return result.error();
}
// translate to kafka offset.
Expand Down
Loading