From 568c78fa7ee92a8c3593c51e501bad0cb6250237 Mon Sep 17 00:00:00 2001 From: Stephan Dollberg Date: Fri, 26 Apr 2024 11:04:04 +0100 Subject: [PATCH 1/5] Revert "treewide: remove old fetch impl and mark fetch_read_strategy as deprecated" This reverts commit c735156359cb95974a367475a1d5d5f080fce0f5. There is scenarios where polling with high debounce can be used to limit fetch perf impact (at the cost of high latency). (cherry picked from commit 03a9f270133d06786c0802dd7005284414a34a59) --- src/v/config/configuration.cc | 14 ++- src/v/config/configuration.h | 2 +- src/v/config/convert.h | 31 ++++++ src/v/config/property.h | 2 + src/v/config/rjson_serialization.cc | 5 + src/v/config/rjson_serialization.h | 3 + src/v/kafka/server/handlers/fetch.cc | 135 ++++++++++++++++++++++++++- src/v/model/metadata.h | 19 ++++ src/v/model/model.cc | 18 ++++ 9 files changed, 222 insertions(+), 7 deletions(-) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index f380bcac5f44..4683b848d2a5 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -630,7 +630,19 @@ configuration::configuration() "wasn't reached", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, 1ms) - , fetch_read_strategy(*this, "fetch_read_strategy") + , fetch_read_strategy( + *this, + "fetch_read_strategy", + "The strategy used to fulfill fetch requests", + {.needs_restart = needs_restart::no, + .example = model::fetch_read_strategy_to_string( + model::fetch_read_strategy::non_polling), + .visibility = visibility::tunable}, + model::fetch_read_strategy::non_polling, + { + model::fetch_read_strategy::polling, + model::fetch_read_strategy::non_polling, + }) , alter_topic_cfg_timeout_ms( *this, "alter_topic_cfg_timeout_ms", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 9af63bd2e3dc..159004eef8b6 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -144,7 +144,7 @@ struct configuration final : public config_store { property tx_timeout_delay_ms; deprecated_property rm_violation_recovery_policy; property fetch_reads_debounce_timeout; - deprecated_property fetch_read_strategy; + enum_property fetch_read_strategy; property alter_topic_cfg_timeout_ms; property log_cleanup_policy; enum_property log_message_timestamp_type; diff --git a/src/v/config/convert.h b/src/v/config/convert.h index 9263205a059b..cc0d03940999 100644 --- a/src/v/config/convert.h +++ b/src/v/config/convert.h @@ -517,6 +517,37 @@ struct convert { } }; +template<> +struct convert { + using type = model::fetch_read_strategy; + + static constexpr auto acceptable_values = std::to_array( + {model::fetch_read_strategy_to_string(type::polling), + model::fetch_read_strategy_to_string(type::non_polling)}); + + static Node encode(const type& rhs) { return Node(fmt::format("{}", rhs)); } + + static bool decode(const Node& node, type& rhs) { + auto value = node.as(); + + if ( + std::find(acceptable_values.begin(), acceptable_values.end(), value) + == acceptable_values.end()) { + return false; + } + + rhs = string_switch(std::string_view{value}) + .match( + model::fetch_read_strategy_to_string(type::polling), + type::polling) + .match( + model::fetch_read_strategy_to_string(type::non_polling), + type::non_polling); + + return true; + } +}; + template<> struct convert { using type = model::write_caching_mode; diff --git a/src/v/config/property.h b/src/v/config/property.h index dafe9d989601..8f4003da7feb 100644 --- a/src/v/config/property.h +++ b/src/v/config/property.h @@ -648,6 +648,8 @@ consteval std::string_view property_type_name() { pandaproxy::schema_registry:: schema_id_validation_mode>) { return "string"; + } else if constexpr (std::is_same_v) { + return "string"; } else if constexpr (std::is_same_v) { return "string"; } else if constexpr (std:: diff --git a/src/v/config/rjson_serialization.cc b/src/v/config/rjson_serialization.cc index c79c0c05c97d..9b0cc4fc380b 100644 --- a/src/v/config/rjson_serialization.cc +++ b/src/v/config/rjson_serialization.cc @@ -160,6 +160,11 @@ void rjson_serialize( stringize(w, v); } +void rjson_serialize( + json::Writer& w, const model::fetch_read_strategy& v) { + stringize(w, v); +} + void rjson_serialize( json::Writer& w, const model::write_caching_mode& v) { stringize(w, v); diff --git a/src/v/config/rjson_serialization.h b/src/v/config/rjson_serialization.h index cf96413131de..3f5bccb45004 100644 --- a/src/v/config/rjson_serialization.h +++ b/src/v/config/rjson_serialization.h @@ -76,6 +76,9 @@ void rjson_serialize( void rjson_serialize( json::Writer& w, const model::leader_balancer_mode& v); +void rjson_serialize( + json::Writer& w, const model::fetch_read_strategy& v); + void rjson_serialize( json::Writer& w, const model::write_caching_mode& v); diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index b8ea6ca44100..f11ecc7473c2 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -603,6 +603,73 @@ bool shard_fetch::empty() const { return requests.empty(); } +/** + * Top-level handler for fetching from single shard. The result is + * unwrapped and any errors from the storage sub-system are translated + * into kafka specific response codes. On failure or success the + * partition response is finalized and placed into its position in the + * response message. + */ +static ss::future<> +handle_shard_fetch(ss::shard_id shard, op_context& octx, shard_fetch fetch) { + // if over budget skip the fetch. + if (octx.bytes_left <= 0) { + return ss::now(); + } + // no requests for this shard, do nothing + if (fetch.empty()) { + return ss::now(); + } + + const bool foreign_read = shard != ss::this_shard_id(); + + // dispatch to remote core + return octx.rctx.partition_manager() + .invoke_on( + shard, + octx.ssg, + [foreign_read, configs = std::move(fetch.requests), &octx]( + cluster::partition_manager& mgr) mutable { + // &octx is captured only to immediately use its accessors here so + // that there is a list of all objects accessed next to `invoke_on`. + // This is meant to help avoiding unintended cross shard access + return fetch_ntps_in_parallel( + mgr, + octx.rctx.server().local().get_replica_selector(), + std::move(configs), + foreign_read, + octx.deadline, + octx.bytes_left, + octx.rctx.server().local().memory(), + octx.rctx.server().local().memory_fetch_sem()); + }) + .then([responses = std::move(fetch.responses), + start_time = fetch.start_time, + &octx](std::vector results) mutable { + fill_fetch_responses(octx, std::move(results), responses, start_time); + }); +} + +class parallel_fetch_plan_executor final : public fetch_plan_executor::impl { + ss::future<> execute_plan(op_context& octx, fetch_plan plan) final { + std::vector> fetches; + fetches.reserve(ss::smp::count); + + // start fetching from random shard to make sure that we fetch data from + // all the partition even if we reach fetch message size limit + const ss::shard_id start_shard_idx = random_generators::get_int( + ss::smp::count - 1); + for (size_t i = 0; i < ss::smp::count; ++i) { + auto shard = (start_shard_idx + i) % ss::smp::count; + + fetches.push_back(handle_shard_fetch( + shard, octx, std::move(plan.fetches_per_shard[shard]))); + } + + return ss::when_all_succeed(fetches.begin(), fetches.end()); + } +}; + class fetch_worker { public: // Passed from the coordinator shard to fetch workers. @@ -1285,6 +1352,49 @@ class simple_fetch_planner final : public fetch_planner::impl { } }; +/** + * Process partition fetch requests. + * + * Each request is handled serially in the order they appear in the request. + * There are a couple reasons why we are not **yet** processing these in + * parallel. First, Kafka expects to some extent that the order of the + * partitions in the request is an implicit priority on which partitions to + * read from. This is closely related to the request budget limits specified + * in terms of maximum bytes and maximum time delay. + * + * Once we start processing requests in parallel we'll have to work through + * various challenges. First, once we dispatch in parallel, we'll need to + * develop heuristics for dealing with the implicit priority order. We'll + * also need to develop techniques and heuristics for dealing with budgets + * since global budgets aren't trivially divisible onto each core when + * partition requests may produce non-uniform amounts of data. + * + * w.r.t. what is needed to parallelize this, there are no data dependencies + * between partition requests within the fetch request, and so they can be + * run fully in parallel. The only dependency that exists is that the + * response must be reassembled such that the responses appear in these + * order as the partitions in the request. + */ +static ss::future<> fetch_topic_partitions(op_context& octx) { + auto planner = make_fetch_planner(); + + auto fetch_plan = planner.create_plan(octx); + + fetch_plan_executor executor + = make_fetch_plan_executor(); + co_await executor.execute_plan(octx, std::move(fetch_plan)); + + if (octx.should_stop_fetch()) { + co_return; + } + + octx.reset_context(); + // debounce next read retry + co_await ss::sleep(std::min( + config::shard_local_cfg().fetch_reads_debounce_timeout(), + octx.request.data.max_wait_ms)); +} + namespace testing { kafka::fetch_plan make_simple_fetch_plan(op_context& octx) { auto planner = make_fetch_planner(); @@ -1294,11 +1404,26 @@ kafka::fetch_plan make_simple_fetch_plan(op_context& octx) { namespace { ss::future<> do_fetch(op_context& octx) { - auto planner = make_fetch_planner(); - auto fetch_plan = planner.create_plan(octx); - - nonpolling_fetch_plan_executor executor; - co_await executor.execute_plan(octx, std::move(fetch_plan)); + switch (config::shard_local_cfg().fetch_read_strategy) { + case model::fetch_read_strategy::polling: { + // first fetch, do not wait + co_await fetch_topic_partitions(octx).then([&octx] { + return ss::do_until( + [&octx] { return octx.should_stop_fetch(); }, + [&octx] { return fetch_topic_partitions(octx); }); + }); + } break; + case model::fetch_read_strategy::non_polling: { + auto planner = make_fetch_planner(); + auto fetch_plan = planner.create_plan(octx); + + nonpolling_fetch_plan_executor executor; + co_await executor.execute_plan(octx, std::move(fetch_plan)); + } break; + default: { + vassert(false, "not implemented"); + } break; + } } } // namespace diff --git a/src/v/model/metadata.h b/src/v/model/metadata.h index bcd83b9b172a..115e117dc588 100644 --- a/src/v/model/metadata.h +++ b/src/v/model/metadata.h @@ -546,6 +546,25 @@ operator<<(std::ostream& os, cloud_storage_chunk_eviction_strategy st) { } } +enum class fetch_read_strategy : uint8_t { + polling = 0, + non_polling = 1, +}; + +constexpr const char* fetch_read_strategy_to_string(fetch_read_strategy s) { + switch (s) { + case fetch_read_strategy::polling: + return "polling"; + case fetch_read_strategy::non_polling: + return "non_polling"; + default: + throw std::invalid_argument("unknown fetch_read_strategy"); + } +} + +std::ostream& operator<<(std::ostream&, fetch_read_strategy); +std::istream& operator>>(std::istream&, fetch_read_strategy&); + /** * Type representing MPX virtual cluster. MPX uses XID to identify clusters. */ diff --git a/src/v/model/model.cc b/src/v/model/model.cc index 6890b127be3d..c76855e544e7 100644 --- a/src/v/model/model.cc +++ b/src/v/model/model.cc @@ -485,6 +485,24 @@ std::ostream& operator<<(std::ostream& o, const batch_identity& bid) { return o; } +std::ostream& operator<<(std::ostream& o, fetch_read_strategy s) { + o << fetch_read_strategy_to_string(s); + return o; +} + +std::istream& operator>>(std::istream& i, fetch_read_strategy& strat) { + ss::sstring s; + i >> s; + strat = string_switch(s) + .match( + fetch_read_strategy_to_string(fetch_read_strategy::polling), + fetch_read_strategy::polling) + .match( + fetch_read_strategy_to_string(fetch_read_strategy::non_polling), + fetch_read_strategy::non_polling); + return i; +} + std::ostream& operator<<(std::ostream& o, write_caching_mode mode) { o << write_caching_mode_to_string(mode); return o; From 4629f30573e460ea14a0fef1546686cf3ee29b59 Mon Sep 17 00:00:00 2001 From: Stephan Dollberg Date: Fri, 26 Apr 2024 11:22:53 +0100 Subject: [PATCH 2/5] Revert "kafka: remove existing fetch latency measurement" This reverts commit dd4676aa3a41e44e23d9b73c79670f85a1fe0d5d. Will be replaced with 23.3 commit (cherry picked from commit 7262ff8c755ae14022c110783362f183f237def1) --- src/v/kafka/server/handlers/fetch.cc | 12 ++++++++++-- src/v/kafka/server/handlers/fetch.h | 10 ++++++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index f11ecc7473c2..12770a470ed7 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -422,7 +422,8 @@ read_result::memory_units_t reserve_memory_units( static void fill_fetch_responses( op_context& octx, std::vector results, - const std::vector& responses) { + const std::vector& responses, + op_context::latency_point start_time) { auto range = boost::irange(0, results.size()); if (unlikely(results.size() != responses.size())) { // soft assert & recovery attempt @@ -518,6 +519,10 @@ static void fill_fetch_responses( } resp_it->set(std::move(resp)); + std::chrono::microseconds fetch_latency + = std::chrono::duration_cast( + op_context::latency_clock::now() - start_time); + octx.rctx.probe().record_fetch_latency(fetch_latency); } } @@ -1151,7 +1156,10 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { }); fill_fetch_responses( - octx, std::move(results.read_results), fetch.responses); + octx, + std::move(results.read_results), + fetch.responses, + fetch.start_time); octx.rctx.probe().record_fetch_latency( results.first_run_latency_result); diff --git a/src/v/kafka/server/handlers/fetch.h b/src/v/kafka/server/handlers/fetch.h index 285886782541..dda2aef56ed1 100644 --- a/src/v/kafka/server/handlers/fetch.h +++ b/src/v/kafka/server/handlers/fetch.h @@ -331,6 +331,9 @@ struct read_result { // struct aggregating fetch requests and corresponding response iterators for // the same shard struct shard_fetch { + explicit shard_fetch(op_context::latency_point start_time) + : start_time{start_time} {} + void push_back( ntp_fetch_config config, op_context::response_placeholder_ptr r_ph) { requests.push_back(std::move(config)); @@ -346,6 +349,7 @@ struct shard_fetch { ss::shard_id shard; std::vector requests; std::vector responses; + op_context::latency_point start_time; friend std::ostream& operator<<(std::ostream& o, const shard_fetch& sf) { fmt::print(o, "{}", sf.requests); @@ -354,8 +358,10 @@ struct shard_fetch { }; struct fetch_plan { - explicit fetch_plan(size_t shards) - : fetches_per_shard(shards, shard_fetch()) { + explicit fetch_plan( + size_t shards, + op_context::latency_point start_time = op_context::latency_clock::now()) + : fetches_per_shard(shards, shard_fetch(start_time)) { for (size_t i = 0; i < fetches_per_shard.size(); i++) { fetches_per_shard[i].shard = i; } From 04c8c222dd7aa7f94c66d897eb646005e1800d82 Mon Sep 17 00:00:00 2001 From: Stephan Dollberg Date: Fri, 26 Apr 2024 11:23:08 +0100 Subject: [PATCH 3/5] Revert "kafka: redefine fetch latency metric" This reverts commit 44bde8d839cac08b05b8e59fab65140d577ee209. Will be replaced with 23.3 commit (cherry picked from commit 0cacd8369300232ca50059a47a2d770079d1a3f9) --- src/v/kafka/server/handlers/fetch.cc | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 12770a470ed7..9b0ca9717d0c 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -707,8 +707,6 @@ class fetch_worker { std::vector read_results; // The total amount of bytes read across all results in `read_results`. size_t total_size; - // The time it took for the first `fetch_ntps_in_parallel` to complete - std::chrono::microseconds first_run_latency_result; }; ss::future run() { @@ -870,7 +868,6 @@ class fetch_worker { ss::future do_run() { bool first_run{true}; - std::chrono::microseconds first_run_latency_result{0}; // A map of indexes in `requests` to their corresponding index in // `_ctx.requests`. std::vector requests_map; @@ -897,11 +894,6 @@ class fetch_worker { _completed_waiter_count.current()); } - std::optional start_time; - if (first_run) { - start_time = op_context::latency_clock::now(); - } - auto q_results = co_await query_requests(std::move(requests)); if (first_run) { results = std::move(q_results.results); @@ -909,9 +901,6 @@ class fetch_worker { _last_visible_indexes = std::move( q_results.last_visible_indexes); - first_run_latency_result - = std::chrono::duration_cast( - op_context::latency_clock::now() - *start_time); } else { // Override the older results of the partitions with the newly // queried results. @@ -933,7 +922,6 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, - .first_run_latency_result = first_run_latency_result, }; } @@ -955,7 +943,6 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, - .first_run_latency_result = first_run_latency_result, }; } @@ -965,7 +952,6 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, - .first_run_latency_result = first_run_latency_result, }; } @@ -1161,9 +1147,6 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { fetch.responses, fetch.start_time); - octx.rctx.probe().record_fetch_latency( - results.first_run_latency_result); - _last_result_size[fetch.shard] = results.total_size; _completed_shard_fetches.push_back(std::move(fetch)); _has_completed_shard_fetches.signal(); From d8fbcb76ddec78cba206ebaccc62ff01ba2b0234 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Tue, 9 Apr 2024 20:45:45 -0400 Subject: [PATCH 4/5] kafka: redefine fetch latency metric The `kafka_latency_fetch_latency` metric originally measured the time it'd take to complete one fetch poll. A fetch poll would create a fetch plan then execute it in parallel on every shard. On a given shard `fetch_ntps_in_parallel` would account for the majority of the execution time of the plan. Since fetches are no longer implemented by polling there isn't an exactly equivalent measurement that can be assigned to the metric. This commit instead records the duration of the first call to `fetch_ntps_in_parallel` on every shard to the metric. This first call takes as long as it would during a fetch poll. Hence the resulting measurement should be close to the duration of a fetch poll. (cherry picked from commit 44bde8d839cac08b05b8e59fab65140d577ee209) (cherry picked from commit afd1c9a8947ade4514bb0b8cf588c27a98483e70) --- src/v/kafka/server/handlers/fetch.cc | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 9b0ca9717d0c..12770a470ed7 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -707,6 +707,8 @@ class fetch_worker { std::vector read_results; // The total amount of bytes read across all results in `read_results`. size_t total_size; + // The time it took for the first `fetch_ntps_in_parallel` to complete + std::chrono::microseconds first_run_latency_result; }; ss::future run() { @@ -868,6 +870,7 @@ class fetch_worker { ss::future do_run() { bool first_run{true}; + std::chrono::microseconds first_run_latency_result{0}; // A map of indexes in `requests` to their corresponding index in // `_ctx.requests`. std::vector requests_map; @@ -894,6 +897,11 @@ class fetch_worker { _completed_waiter_count.current()); } + std::optional start_time; + if (first_run) { + start_time = op_context::latency_clock::now(); + } + auto q_results = co_await query_requests(std::move(requests)); if (first_run) { results = std::move(q_results.results); @@ -901,6 +909,9 @@ class fetch_worker { _last_visible_indexes = std::move( q_results.last_visible_indexes); + first_run_latency_result + = std::chrono::duration_cast( + op_context::latency_clock::now() - *start_time); } else { // Override the older results of the partitions with the newly // queried results. @@ -922,6 +933,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -943,6 +955,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -952,6 +965,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -1147,6 +1161,9 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { fetch.responses, fetch.start_time); + octx.rctx.probe().record_fetch_latency( + results.first_run_latency_result); + _last_result_size[fetch.shard] = results.total_size; _completed_shard_fetches.push_back(std::move(fetch)); _has_completed_shard_fetches.signal(); From f89d7520096193dbb57e1ae47e35c669dbcbaa6f Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Thu, 25 Apr 2024 11:19:29 -0400 Subject: [PATCH 5/5] kafka: add option to record latency in fill_fetch_responses (cherry picked from commit de1c248d37aec1670958d5d2aefd45e428a1f9db) --- src/v/kafka/server/handlers/fetch.cc | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 12770a470ed7..284c4aea70f5 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -423,7 +423,8 @@ static void fill_fetch_responses( op_context& octx, std::vector results, const std::vector& responses, - op_context::latency_point start_time) { + op_context::latency_point start_time, + bool record_latency = true) { auto range = boost::irange(0, results.size()); if (unlikely(results.size() != responses.size())) { // soft assert & recovery attempt @@ -519,10 +520,13 @@ static void fill_fetch_responses( } resp_it->set(std::move(resp)); - std::chrono::microseconds fetch_latency - = std::chrono::duration_cast( - op_context::latency_clock::now() - start_time); - octx.rctx.probe().record_fetch_latency(fetch_latency); + + if (record_latency) { + std::chrono::microseconds fetch_latency + = std::chrono::duration_cast( + op_context::latency_clock::now() - start_time); + octx.rctx.probe().record_fetch_latency(fetch_latency); + } } } @@ -1159,7 +1163,8 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { octx, std::move(results.read_results), fetch.responses, - fetch.start_time); + fetch.start_time, + false); octx.rctx.probe().record_fetch_latency( results.first_run_latency_result);