From 3ba7a3472510916c833854b250f2af3fe819205e Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Wed, 19 Jun 2024 22:56:42 -0400 Subject: [PATCH] kafka: improve handling of progress conditions in fetch The use of `ss::when_any` and `ss::promise<>` for each progress condition resulted in unneeded allocations as well as expensive broken promises and exception propagation. This commit changes all progress conditions to signal the same `ss::condition_variable` instead. This simplifies their handling and greatly reduces their cost. --- src/v/kafka/server/handlers/fetch.cc | 60 +++++++++++++++------------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index cc7bb9cbadfa7..a6ba203ac9fda 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -45,7 +45,6 @@ #include #include #include -#include #include #include @@ -1011,12 +1010,19 @@ class fetch_worker { class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { public: nonpolling_fetch_plan_executor() - : _last_result_size(ss::smp::count, 0) {} + : _last_result_size(ss::smp::count, 0) + , _fetch_timeout{[this] { _has_progress.signal(); }} {} /** * Executes the supplied `plan` until `octx.should_stop_fetch` returns true. */ ss::future<> execute_plan(op_context& octx, fetch_plan plan) final { + if (!initialize_progress_conditions(octx)) { + // if the progress conditions were unable to be initialized then + // either the fetch has been aborted or the deadline was reached. + co_return; + } + start_worker_aborts(plan); co_await handle_exceptions(do_execute_plan(octx, std::move(plan))); @@ -1047,7 +1053,7 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { } for (;;) { - co_await progress_conditions(octx); + co_await wait_for_progress(); if (octx.should_stop_fetch() || _thrown_exception) { co_return; @@ -1092,45 +1098,43 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { } /** - * Determines if the should_stop_fetch() condition should be checked again. - * The return future is set if; + * Sets _has_progress to be signaled if; * - octx.deadline has been reached. * - _as has been aborted. - * - one of the shard workers has returned results. + * returns true if this was successful + * false otherwise */ - ss::future<> progress_conditions(op_context& octx) { - ss::promise<> timeout_pr, abort_pr; - + bool initialize_progress_conditions(op_context& octx) { // A connection can close and stop the sharded abort source before we // can subscribe to it. So we check here if that is the case and return // if so. if (!octx.rctx.abort_source().local_is_initialized()) { - co_return; + return false; } - auto abort_sub_opt = octx.rctx.abort_source().subscribe( - [&abort_pr]() noexcept { abort_pr.set_value(); }); + _fetch_abort_sub = octx.rctx.abort_source().subscribe( + [this]() noexcept { _has_progress.signal(); }); - if (!abort_sub_opt) { - abort_pr.set_value(); + if (!_fetch_abort_sub) { + return false; } - ss::timer s{ - [&timeout_pr] { timeout_pr.set_value(); }}; - if (octx.deadline) { - s.arm(octx.deadline.value()); + _fetch_timeout.arm(octx.deadline.value()); } - // Ignoring any exceptions from these futures is fine since they are - // only used to signal the coordinator that it should check if the fetch - // request should end or not. - co_await ss::when_any( - ignore_exceptions(_has_completed_shard_fetches.wait()), - ignore_exceptions(timeout_pr.get_future()), - ignore_exceptions(abort_pr.get_future())); + return true; } + /** + * Waits until the should_stop_fetch() condition should be checked again. + * The return future is set if; + * - octx.deadline has been reached. + * - _as has been aborted. + * - one of the shard workers has returned results. + */ + ss::future<> wait_for_progress() { return _has_progress.wait(); } + /* * `start_shard_fetch_worker` executes on the coordinator shard. It builds * the `shard_local_fetch_context` struct needed to start the @@ -1196,7 +1200,7 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { _last_result_size[fetch.shard] = results.total_size; _completed_shard_fetches.push_back(std::move(fetch)); - _has_completed_shard_fetches.signal(); + _has_progress.signal(); } static ss::future<> ignore_exceptions(ss::future<> fut) { @@ -1227,13 +1231,15 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { ss::gate _workers_gate; std::unordered_map _worker_aborts; - ss::condition_variable _has_completed_shard_fetches; + ss::condition_variable _has_progress; std::vector _completed_shard_fetches; std::vector _last_result_size; // If any child task throws an exception this holds on to the exception // until all child tasks have been stopped and its safe to rethrow the // exception. std::exception_ptr _thrown_exception; + ss::optimized_optional _fetch_abort_sub; + ss::timer _fetch_timeout; }; size_t op_context::fetch_partition_count() const {