Skip to content

Commit

Permalink
Merge pull request redpanda-data#19930 from ballard26/fetch-opt-2
Browse files Browse the repository at this point in the history
Improve handling of progress conditions in non-polling fetch
  • Loading branch information
piyushredpanda authored Jul 3, 2024
2 parents 873d601 + 8fd8fcd commit 146b83d
Showing 1 changed file with 33 additions and 27 deletions.
60 changes: 33 additions & 27 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
#include <seastar/core/scheduling.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/when_any.hh>
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/util/log.hh>

Expand Down Expand Up @@ -1012,7 +1011,8 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
public:
explicit nonpolling_fetch_plan_executor(bool debounce = false)
: _last_result_size(ss::smp::count, 0)
, _debounce(debounce) {}
, _debounce(debounce)
, _fetch_timeout{[this] { _has_progress.signal(); }} {}

/**
* Executes the supplied `plan` until `octx.should_stop_fetch` returns true.
Expand All @@ -1024,6 +1024,12 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
octx.request.data.max_wait_ms));
}

if (!initialize_progress_conditions(octx)) {
// if the progress conditions were unable to be initialized then
// either the fetch has been aborted or the deadline was reached.
co_return;
}

start_worker_aborts(plan);
co_await handle_exceptions(do_execute_plan(octx, std::move(plan)));

Expand Down Expand Up @@ -1053,7 +1059,7 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
}

for (;;) {
co_await progress_conditions(octx);
co_await wait_for_progress();

if (octx.should_stop_fetch() || _thrown_exception) {
co_return;
Expand Down Expand Up @@ -1098,45 +1104,43 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
}

/**
* Determines if the should_stop_fetch() condition should be checked again.
* The return future is set if;
* Sets _has_progress to be signaled if;
* - octx.deadline has been reached.
* - _as has been aborted.
* - one of the shard workers has returned results.
* returns true if this was successful
* false otherwise
*/
ss::future<> progress_conditions(op_context& octx) {
ss::promise<> timeout_pr, abort_pr;

bool initialize_progress_conditions(op_context& octx) {
// A connection can close and stop the sharded abort source before we
// can subscribe to it. So we check here if that is the case and return
// if so.
if (!octx.rctx.abort_source().local_is_initialized()) {
co_return;
return false;
}

auto abort_sub_opt = octx.rctx.abort_source().subscribe(
[&abort_pr]() noexcept { abort_pr.set_value(); });
_fetch_abort_sub = octx.rctx.abort_source().subscribe(
[this]() noexcept { _has_progress.signal(); });

if (!abort_sub_opt) {
abort_pr.set_value();
if (!_fetch_abort_sub) {
return false;
}

ss::timer<model::timeout_clock> s{
[&timeout_pr] { timeout_pr.set_value(); }};

if (octx.deadline) {
s.arm(octx.deadline.value());
_fetch_timeout.arm(octx.deadline.value());
}

// Ignoring any exceptions from these futures is fine since they are
// only used to signal the coordinator that it should check if the fetch
// request should end or not.
co_await ss::when_any(
ignore_exceptions(_has_completed_shard_fetches.wait()),
ignore_exceptions(timeout_pr.get_future()),
ignore_exceptions(abort_pr.get_future()));
return true;
}

/**
* Waits until the should_stop_fetch() condition should be checked again.
* The return future is set if;
* - octx.deadline has been reached.
* - _as has been aborted.
* - one of the shard workers has returned results.
*/
ss::future<> wait_for_progress() { return _has_progress.wait(); }

/*
* `start_shard_fetch_worker` executes on the coordinator shard. It builds
* the `shard_local_fetch_context` struct needed to start the
Expand Down Expand Up @@ -1202,7 +1206,7 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {

_last_result_size[fetch.shard] = results.total_size;
_completed_shard_fetches.push_back(std::move(fetch));
_has_completed_shard_fetches.signal();
_has_progress.signal();
}

static ss::future<> ignore_exceptions(ss::future<> fut) {
Expand Down Expand Up @@ -1233,14 +1237,16 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {

ss::gate _workers_gate;
std::unordered_map<ss::shard_id, ss::abort_source> _worker_aborts;
ss::condition_variable _has_completed_shard_fetches;
ss::condition_variable _has_progress;
std::vector<shard_fetch> _completed_shard_fetches;
std::vector<size_t> _last_result_size;
// If any child task throws an exception this holds on to the exception
// until all child tasks have been stopped and its safe to rethrow the
// exception.
std::exception_ptr _thrown_exception;
bool _debounce;
ss::optimized_optional<ss::abort_source::subscription> _fetch_abort_sub;
ss::timer<model::timeout_clock> _fetch_timeout;
};

size_t op_context::fetch_partition_count() const {
Expand Down

0 comments on commit 146b83d

Please sign in to comment.