Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] Stephan/revert polling removal #18176

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,19 @@ configuration::configuration()
"wasn't reached",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
1ms)
, fetch_read_strategy(*this, "fetch_read_strategy")
, fetch_read_strategy(
*this,
"fetch_read_strategy",
"The strategy used to fulfill fetch requests",
{.needs_restart = needs_restart::no,
.example = model::fetch_read_strategy_to_string(
model::fetch_read_strategy::non_polling),
.visibility = visibility::tunable},
model::fetch_read_strategy::non_polling,
{
model::fetch_read_strategy::polling,
model::fetch_read_strategy::non_polling,
})
, alter_topic_cfg_timeout_ms(
*this,
"alter_topic_cfg_timeout_ms",
Expand Down
2 changes: 1 addition & 1 deletion src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ struct configuration final : public config_store {
property<std::chrono::milliseconds> tx_timeout_delay_ms;
deprecated_property rm_violation_recovery_policy;
property<std::chrono::milliseconds> fetch_reads_debounce_timeout;
deprecated_property fetch_read_strategy;
enum_property<model::fetch_read_strategy> fetch_read_strategy;
property<std::chrono::milliseconds> alter_topic_cfg_timeout_ms;
property<model::cleanup_policy_bitflags> log_cleanup_policy;
enum_property<model::timestamp_type> log_message_timestamp_type;
Expand Down
31 changes: 31 additions & 0 deletions src/v/config/convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,37 @@ struct convert<pandaproxy::schema_registry::schema_id_validation_mode> {
}
};

template<>
struct convert<model::fetch_read_strategy> {
using type = model::fetch_read_strategy;

static constexpr auto acceptable_values = std::to_array(
{model::fetch_read_strategy_to_string(type::polling),
model::fetch_read_strategy_to_string(type::non_polling)});

static Node encode(const type& rhs) { return Node(fmt::format("{}", rhs)); }

static bool decode(const Node& node, type& rhs) {
auto value = node.as<std::string>();

if (
std::find(acceptable_values.begin(), acceptable_values.end(), value)
== acceptable_values.end()) {
return false;
}

rhs = string_switch<type>(std::string_view{value})
.match(
model::fetch_read_strategy_to_string(type::polling),
type::polling)
.match(
model::fetch_read_strategy_to_string(type::non_polling),
type::non_polling);

return true;
}
};

template<>
struct convert<model::write_caching_mode> {
using type = model::write_caching_mode;
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,8 @@ consteval std::string_view property_type_name() {
pandaproxy::schema_registry::
schema_id_validation_mode>) {
return "string";
} else if constexpr (std::is_same_v<type, model::fetch_read_strategy>) {
return "string";
} else if constexpr (std::is_same_v<type, model::write_caching_mode>) {
return "string";
} else if constexpr (std::
Expand Down
5 changes: 5 additions & 0 deletions src/v/config/rjson_serialization.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ void rjson_serialize(
stringize(w, v);
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::fetch_read_strategy& v) {
stringize(w, v);
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::write_caching_mode& v) {
stringize(w, v);
Expand Down
3 changes: 3 additions & 0 deletions src/v/config/rjson_serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ void rjson_serialize(
void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::leader_balancer_mode& v);

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::fetch_read_strategy& v);

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::write_caching_mode& v);

Expand Down
152 changes: 145 additions & 7 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,9 @@ read_result::memory_units_t reserve_memory_units(
static void fill_fetch_responses(
op_context& octx,
std::vector<read_result> results,
const std::vector<op_context::response_placeholder_ptr>& responses) {
const std::vector<op_context::response_placeholder_ptr>& responses,
op_context::latency_point start_time,
bool record_latency = true) {
auto range = boost::irange<size_t>(0, results.size());
if (unlikely(results.size() != responses.size())) {
// soft assert & recovery attempt
Expand Down Expand Up @@ -518,6 +520,13 @@ static void fill_fetch_responses(
}

resp_it->set(std::move(resp));

if (record_latency) {
std::chrono::microseconds fetch_latency
= std::chrono::duration_cast<std::chrono::microseconds>(
op_context::latency_clock::now() - start_time);
octx.rctx.probe().record_fetch_latency(fetch_latency);
}
}
}

Expand Down Expand Up @@ -603,6 +612,73 @@ bool shard_fetch::empty() const {
return requests.empty();
}

/**
* Top-level handler for fetching from single shard. The result is
* unwrapped and any errors from the storage sub-system are translated
* into kafka specific response codes. On failure or success the
* partition response is finalized and placed into its position in the
* response message.
*/
static ss::future<>
handle_shard_fetch(ss::shard_id shard, op_context& octx, shard_fetch fetch) {
// if over budget skip the fetch.
if (octx.bytes_left <= 0) {
return ss::now();
}
// no requests for this shard, do nothing
if (fetch.empty()) {
return ss::now();
}

const bool foreign_read = shard != ss::this_shard_id();

// dispatch to remote core
return octx.rctx.partition_manager()
.invoke_on(
shard,
octx.ssg,
[foreign_read, configs = std::move(fetch.requests), &octx](
cluster::partition_manager& mgr) mutable {
// &octx is captured only to immediately use its accessors here so
// that there is a list of all objects accessed next to `invoke_on`.
// This is meant to help avoiding unintended cross shard access
return fetch_ntps_in_parallel(
mgr,
octx.rctx.server().local().get_replica_selector(),
std::move(configs),
foreign_read,
octx.deadline,
octx.bytes_left,
octx.rctx.server().local().memory(),
octx.rctx.server().local().memory_fetch_sem());
})
.then([responses = std::move(fetch.responses),
start_time = fetch.start_time,
&octx](std::vector<read_result> results) mutable {
fill_fetch_responses(octx, std::move(results), responses, start_time);
});
}

class parallel_fetch_plan_executor final : public fetch_plan_executor::impl {
ss::future<> execute_plan(op_context& octx, fetch_plan plan) final {
std::vector<ss::future<>> fetches;
fetches.reserve(ss::smp::count);

// start fetching from random shard to make sure that we fetch data from
// all the partition even if we reach fetch message size limit
const ss::shard_id start_shard_idx = random_generators::get_int(
ss::smp::count - 1);
for (size_t i = 0; i < ss::smp::count; ++i) {
auto shard = (start_shard_idx + i) % ss::smp::count;

fetches.push_back(handle_shard_fetch(
shard, octx, std::move(plan.fetches_per_shard[shard])));
}

return ss::when_all_succeed(fetches.begin(), fetches.end());
}
};

class fetch_worker {
public:
// Passed from the coordinator shard to fetch workers.
Expand Down Expand Up @@ -1084,7 +1160,11 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
});

fill_fetch_responses(
octx, std::move(results.read_results), fetch.responses);
octx,
std::move(results.read_results),
fetch.responses,
fetch.start_time,
false);

octx.rctx.probe().record_fetch_latency(
results.first_run_latency_result);
Expand Down Expand Up @@ -1285,6 +1365,49 @@ class simple_fetch_planner final : public fetch_planner::impl {
}
};

/**
* Process partition fetch requests.
*
* Each request is handled serially in the order they appear in the request.
* There are a couple reasons why we are not **yet** processing these in
* parallel. First, Kafka expects to some extent that the order of the
* partitions in the request is an implicit priority on which partitions to
* read from. This is closely related to the request budget limits specified
* in terms of maximum bytes and maximum time delay.
*
* Once we start processing requests in parallel we'll have to work through
* various challenges. First, once we dispatch in parallel, we'll need to
* develop heuristics for dealing with the implicit priority order. We'll
* also need to develop techniques and heuristics for dealing with budgets
* since global budgets aren't trivially divisible onto each core when
* partition requests may produce non-uniform amounts of data.
*
* w.r.t. what is needed to parallelize this, there are no data dependencies
* between partition requests within the fetch request, and so they can be
* run fully in parallel. The only dependency that exists is that the
* response must be reassembled such that the responses appear in these
* order as the partitions in the request.
*/
static ss::future<> fetch_topic_partitions(op_context& octx) {
auto planner = make_fetch_planner<simple_fetch_planner>();

auto fetch_plan = planner.create_plan(octx);

fetch_plan_executor executor
= make_fetch_plan_executor<parallel_fetch_plan_executor>();
co_await executor.execute_plan(octx, std::move(fetch_plan));

if (octx.should_stop_fetch()) {
co_return;
}

octx.reset_context();
// debounce next read retry
co_await ss::sleep(std::min(
config::shard_local_cfg().fetch_reads_debounce_timeout(),
octx.request.data.max_wait_ms));
}

namespace testing {
kafka::fetch_plan make_simple_fetch_plan(op_context& octx) {
auto planner = make_fetch_planner<simple_fetch_planner>();
Expand All @@ -1294,11 +1417,26 @@ kafka::fetch_plan make_simple_fetch_plan(op_context& octx) {

namespace {
ss::future<> do_fetch(op_context& octx) {
auto planner = make_fetch_planner<simple_fetch_planner>();
auto fetch_plan = planner.create_plan(octx);

nonpolling_fetch_plan_executor executor;
co_await executor.execute_plan(octx, std::move(fetch_plan));
switch (config::shard_local_cfg().fetch_read_strategy) {
case model::fetch_read_strategy::polling: {
// first fetch, do not wait
co_await fetch_topic_partitions(octx).then([&octx] {
return ss::do_until(
[&octx] { return octx.should_stop_fetch(); },
[&octx] { return fetch_topic_partitions(octx); });
});
} break;
case model::fetch_read_strategy::non_polling: {
auto planner = make_fetch_planner<simple_fetch_planner>();
auto fetch_plan = planner.create_plan(octx);

nonpolling_fetch_plan_executor executor;
co_await executor.execute_plan(octx, std::move(fetch_plan));
} break;
default: {
vassert(false, "not implemented");
} break;
}
}
} // namespace

Expand Down
10 changes: 8 additions & 2 deletions src/v/kafka/server/handlers/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ struct read_result {
// struct aggregating fetch requests and corresponding response iterators for
// the same shard
struct shard_fetch {
explicit shard_fetch(op_context::latency_point start_time)
: start_time{start_time} {}

void push_back(
ntp_fetch_config config, op_context::response_placeholder_ptr r_ph) {
requests.push_back(std::move(config));
Expand All @@ -346,6 +349,7 @@ struct shard_fetch {
ss::shard_id shard;
std::vector<ntp_fetch_config> requests;
std::vector<op_context::response_placeholder_ptr> responses;
op_context::latency_point start_time;

friend std::ostream& operator<<(std::ostream& o, const shard_fetch& sf) {
fmt::print(o, "{}", sf.requests);
Expand All @@ -354,8 +358,10 @@ struct shard_fetch {
};

struct fetch_plan {
explicit fetch_plan(size_t shards)
: fetches_per_shard(shards, shard_fetch()) {
explicit fetch_plan(
size_t shards,
op_context::latency_point start_time = op_context::latency_clock::now())
: fetches_per_shard(shards, shard_fetch(start_time)) {
for (size_t i = 0; i < fetches_per_shard.size(); i++) {
fetches_per_shard[i].shard = i;
}
Expand Down
19 changes: 19 additions & 0 deletions src/v/model/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,25 @@ operator<<(std::ostream& os, cloud_storage_chunk_eviction_strategy st) {
}
}

enum class fetch_read_strategy : uint8_t {
polling = 0,
non_polling = 1,
};

constexpr const char* fetch_read_strategy_to_string(fetch_read_strategy s) {
switch (s) {
case fetch_read_strategy::polling:
return "polling";
case fetch_read_strategy::non_polling:
return "non_polling";
default:
throw std::invalid_argument("unknown fetch_read_strategy");
}
}

std::ostream& operator<<(std::ostream&, fetch_read_strategy);
std::istream& operator>>(std::istream&, fetch_read_strategy&);

/**
* Type representing MPX virtual cluster. MPX uses XID to identify clusters.
*/
Expand Down
18 changes: 18 additions & 0 deletions src/v/model/model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,24 @@ std::ostream& operator<<(std::ostream& o, const batch_identity& bid) {
return o;
}

std::ostream& operator<<(std::ostream& o, fetch_read_strategy s) {
o << fetch_read_strategy_to_string(s);
return o;
}

std::istream& operator>>(std::istream& i, fetch_read_strategy& strat) {
ss::sstring s;
i >> s;
strat = string_switch<fetch_read_strategy>(s)
.match(
fetch_read_strategy_to_string(fetch_read_strategy::polling),
fetch_read_strategy::polling)
.match(
fetch_read_strategy_to_string(fetch_read_strategy::non_polling),
fetch_read_strategy::non_polling);
return i;
}

std::ostream& operator<<(std::ostream& o, write_caching_mode mode) {
o << write_caching_mode_to_string(mode);
return o;
Expand Down
Loading