Skip to content

Commit

Permalink
kafka: improve handling of progress conditions in fetch
Browse files Browse the repository at this point in the history
The use of `ss::when_any` and `ss::promise<>` for each progress
condition resulted in unneeded allocations as well as expensive broken
promises and exception propagation. This commit changes all progress
conditions to signal the same `ss::condition_variable` instead. This
simplifies their handling and greatly reduces their cost.
  • Loading branch information
ballard26 committed Jun 20, 2024
1 parent 74b4cfe commit 3ba7a34
Showing 1 changed file with 33 additions and 27 deletions.
60 changes: 33 additions & 27 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
#include <seastar/core/scheduling.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/when_any.hh>
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/util/log.hh>

Expand Down Expand Up @@ -1011,12 +1010,19 @@ class fetch_worker {
class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
public:
nonpolling_fetch_plan_executor()
: _last_result_size(ss::smp::count, 0) {}
: _last_result_size(ss::smp::count, 0)
, _fetch_timeout{[this] { _has_progress.signal(); }} {}

/**
* Executes the supplied `plan` until `octx.should_stop_fetch` returns true.
*/
ss::future<> execute_plan(op_context& octx, fetch_plan plan) final {
if (!initialize_progress_conditions(octx)) {
// if the progress conditions were unable to be initialized then
// either the fetch has been aborted or the deadline was reached.
co_return;
}

start_worker_aborts(plan);

co_await handle_exceptions(do_execute_plan(octx, std::move(plan)));
Expand Down Expand Up @@ -1047,7 +1053,7 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
}

for (;;) {
co_await progress_conditions(octx);
co_await wait_for_progress();

if (octx.should_stop_fetch() || _thrown_exception) {
co_return;
Expand Down Expand Up @@ -1092,45 +1098,43 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
}

/**
* Determines if the should_stop_fetch() condition should be checked again.
* The return future is set if;
* Sets _has_progress to be signaled if;
* - octx.deadline has been reached.
* - _as has been aborted.
* - one of the shard workers has returned results.
* returns true if this was successful
* false otherwise
*/
ss::future<> progress_conditions(op_context& octx) {
ss::promise<> timeout_pr, abort_pr;

bool initialize_progress_conditions(op_context& octx) {
// A connection can close and stop the sharded abort source before we
// can subscribe to it. So we check here if that is the case and return
// if so.
if (!octx.rctx.abort_source().local_is_initialized()) {
co_return;
return false;
}

auto abort_sub_opt = octx.rctx.abort_source().subscribe(
[&abort_pr]() noexcept { abort_pr.set_value(); });
_fetch_abort_sub = octx.rctx.abort_source().subscribe(
[this]() noexcept { _has_progress.signal(); });

if (!abort_sub_opt) {
abort_pr.set_value();
if (!_fetch_abort_sub) {
return false;
}

ss::timer<model::timeout_clock> s{
[&timeout_pr] { timeout_pr.set_value(); }};

if (octx.deadline) {
s.arm(octx.deadline.value());
_fetch_timeout.arm(octx.deadline.value());
}

// Ignoring any exceptions from these futures is fine since they are
// only used to signal the coordinator that it should check if the fetch
// request should end or not.
co_await ss::when_any(
ignore_exceptions(_has_completed_shard_fetches.wait()),
ignore_exceptions(timeout_pr.get_future()),
ignore_exceptions(abort_pr.get_future()));
return true;
}

/**
* Waits until the should_stop_fetch() condition should be checked again.
* The return future is set if;
* - octx.deadline has been reached.
* - _as has been aborted.
* - one of the shard workers has returned results.
*/
ss::future<> wait_for_progress() { return _has_progress.wait(); }

/*
* `start_shard_fetch_worker` executes on the coordinator shard. It builds
* the `shard_local_fetch_context` struct needed to start the
Expand Down Expand Up @@ -1196,7 +1200,7 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {

_last_result_size[fetch.shard] = results.total_size;
_completed_shard_fetches.push_back(std::move(fetch));
_has_completed_shard_fetches.signal();
_has_progress.signal();
}

static ss::future<> ignore_exceptions(ss::future<> fut) {
Expand Down Expand Up @@ -1227,13 +1231,15 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {

ss::gate _workers_gate;
std::unordered_map<ss::shard_id, ss::abort_source> _worker_aborts;
ss::condition_variable _has_completed_shard_fetches;
ss::condition_variable _has_progress;
std::vector<shard_fetch> _completed_shard_fetches;
std::vector<size_t> _last_result_size;
// If any child task throws an exception this holds on to the exception
// until all child tasks have been stopped and its safe to rethrow the
// exception.
std::exception_ptr _thrown_exception;
ss::optimized_optional<ss::abort_source::subscription> _fetch_abort_sub;
ss::timer<model::timeout_clock> _fetch_timeout;
};

size_t op_context::fetch_partition_count() const {
Expand Down

0 comments on commit 3ba7a34

Please sign in to comment.