Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of progress conditions in non-polling fetch #19930

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 33 additions & 27 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
#include <seastar/core/scheduling.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/when_any.hh>
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/util/log.hh>

Expand Down Expand Up @@ -1012,7 +1011,8 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
public:
explicit nonpolling_fetch_plan_executor(bool debounce = false)
: _last_result_size(ss::smp::count, 0)
, _debounce(debounce) {}
, _debounce(debounce)
, _fetch_timeout{[this] { _has_progress.signal(); }} {}

/**
* Executes the supplied `plan` until `octx.should_stop_fetch` returns true.
Expand All @@ -1024,6 +1024,12 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
octx.request.data.max_wait_ms));
}

if (!initialize_progress_conditions(octx)) {
// if the progress conditions were unable to be initialized then
// either the fetch has been aborted or the deadline was reached.
co_return;
}

start_worker_aborts(plan);
co_await handle_exceptions(do_execute_plan(octx, std::move(plan)));

Expand Down Expand Up @@ -1053,7 +1059,7 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
}

for (;;) {
co_await progress_conditions(octx);
co_await wait_for_progress();

if (octx.should_stop_fetch() || _thrown_exception) {
co_return;
Expand Down Expand Up @@ -1098,45 +1104,43 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
}

/**
* Determines if the should_stop_fetch() condition should be checked again.
* The return future is set if;
* Sets _has_progress to be signaled if;
* - octx.deadline has been reached.
* - _as has been aborted.
* - one of the shard workers has returned results.
* returns true if this was successful
* false otherwise
*/
ss::future<> progress_conditions(op_context& octx) {
ss::promise<> timeout_pr, abort_pr;

bool initialize_progress_conditions(op_context& octx) {
// A connection can close and stop the sharded abort source before we
// can subscribe to it. So we check here if that is the case and return
// if so.
if (!octx.rctx.abort_source().local_is_initialized()) {
co_return;
return false;
}

auto abort_sub_opt = octx.rctx.abort_source().subscribe(
[&abort_pr]() noexcept { abort_pr.set_value(); });
_fetch_abort_sub = octx.rctx.abort_source().subscribe(
[this]() noexcept { _has_progress.signal(); });

if (!abort_sub_opt) {
abort_pr.set_value();
if (!_fetch_abort_sub) {
return false;
}

ss::timer<model::timeout_clock> s{
[&timeout_pr] { timeout_pr.set_value(); }};

if (octx.deadline) {
s.arm(octx.deadline.value());
_fetch_timeout.arm(octx.deadline.value());
}

// Ignoring any exceptions from these futures is fine since they are
// only used to signal the coordinator that it should check if the fetch
// request should end or not.
co_await ss::when_any(
ignore_exceptions(_has_completed_shard_fetches.wait()),
ignore_exceptions(timeout_pr.get_future()),
ignore_exceptions(abort_pr.get_future()));
return true;
}

/**
* Waits until the should_stop_fetch() condition should be checked again.
* The return future is set if;
* - octx.deadline has been reached.
* - _as has been aborted.
* - one of the shard workers has returned results.
*/
ss::future<> wait_for_progress() { return _has_progress.wait(); }

/*
* `start_shard_fetch_worker` executes on the coordinator shard. It builds
* the `shard_local_fetch_context` struct needed to start the
Expand Down Expand Up @@ -1202,7 +1206,7 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {

_last_result_size[fetch.shard] = results.total_size;
_completed_shard_fetches.push_back(std::move(fetch));
_has_completed_shard_fetches.signal();
_has_progress.signal();
}

static ss::future<> ignore_exceptions(ss::future<> fut) {
Expand Down Expand Up @@ -1233,14 +1237,16 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {

ss::gate _workers_gate;
std::unordered_map<ss::shard_id, ss::abort_source> _worker_aborts;
ss::condition_variable _has_completed_shard_fetches;
ss::condition_variable _has_progress;
std::vector<shard_fetch> _completed_shard_fetches;
std::vector<size_t> _last_result_size;
// If any child task throws an exception this holds on to the exception
// until all child tasks have been stopped and its safe to rethrow the
// exception.
std::exception_ptr _thrown_exception;
bool _debounce;
ss::optimized_optional<ss::abort_source::subscription> _fetch_abort_sub;
ss::timer<model::timeout_clock> _fetch_timeout;
};

size_t op_context::fetch_partition_count() const {
Expand Down