Skip to content

Commit

Permalink
Merge pull request #18176 from vbotbuildovich/backport-pr-18090-v24.1…
Browse files Browse the repository at this point in the history
….x-878

[v24.1.x] Stephan/revert polling removal
  • Loading branch information
StephanDollberg authored May 1, 2024
2 parents e55c0cb + f89d752 commit 60d0cdc
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 11 deletions.
14 changes: 13 additions & 1 deletion src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,19 @@ configuration::configuration()
"wasn't reached",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
1ms)
, fetch_read_strategy(*this, "fetch_read_strategy")
, fetch_read_strategy(
*this,
"fetch_read_strategy",
"The strategy used to fulfill fetch requests",
{.needs_restart = needs_restart::no,
.example = model::fetch_read_strategy_to_string(
model::fetch_read_strategy::non_polling),
.visibility = visibility::tunable},
model::fetch_read_strategy::non_polling,
{
model::fetch_read_strategy::polling,
model::fetch_read_strategy::non_polling,
})
, alter_topic_cfg_timeout_ms(
*this,
"alter_topic_cfg_timeout_ms",
Expand Down
2 changes: 1 addition & 1 deletion src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ struct configuration final : public config_store {
property<std::chrono::milliseconds> tx_timeout_delay_ms;
deprecated_property rm_violation_recovery_policy;
property<std::chrono::milliseconds> fetch_reads_debounce_timeout;
deprecated_property fetch_read_strategy;
enum_property<model::fetch_read_strategy> fetch_read_strategy;
property<std::chrono::milliseconds> alter_topic_cfg_timeout_ms;
property<model::cleanup_policy_bitflags> log_cleanup_policy;
enum_property<model::timestamp_type> log_message_timestamp_type;
Expand Down
31 changes: 31 additions & 0 deletions src/v/config/convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,37 @@ struct convert<pandaproxy::schema_registry::schema_id_validation_mode> {
}
};

template<>
struct convert<model::fetch_read_strategy> {
using type = model::fetch_read_strategy;

static constexpr auto acceptable_values = std::to_array(
{model::fetch_read_strategy_to_string(type::polling),
model::fetch_read_strategy_to_string(type::non_polling)});

static Node encode(const type& rhs) { return Node(fmt::format("{}", rhs)); }

static bool decode(const Node& node, type& rhs) {
auto value = node.as<std::string>();

if (
std::find(acceptable_values.begin(), acceptable_values.end(), value)
== acceptable_values.end()) {
return false;
}

rhs = string_switch<type>(std::string_view{value})
.match(
model::fetch_read_strategy_to_string(type::polling),
type::polling)
.match(
model::fetch_read_strategy_to_string(type::non_polling),
type::non_polling);

return true;
}
};

template<>
struct convert<model::write_caching_mode> {
using type = model::write_caching_mode;
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,8 @@ consteval std::string_view property_type_name() {
pandaproxy::schema_registry::
schema_id_validation_mode>) {
return "string";
} else if constexpr (std::is_same_v<type, model::fetch_read_strategy>) {
return "string";
} else if constexpr (std::is_same_v<type, model::write_caching_mode>) {
return "string";
} else if constexpr (std::
Expand Down
5 changes: 5 additions & 0 deletions src/v/config/rjson_serialization.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ void rjson_serialize(
stringize(w, v);
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::fetch_read_strategy& v) {
stringize(w, v);
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::write_caching_mode& v) {
stringize(w, v);
Expand Down
3 changes: 3 additions & 0 deletions src/v/config/rjson_serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ void rjson_serialize(
void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::leader_balancer_mode& v);

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::fetch_read_strategy& v);

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::write_caching_mode& v);

Expand Down
152 changes: 145 additions & 7 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,9 @@ read_result::memory_units_t reserve_memory_units(
static void fill_fetch_responses(
op_context& octx,
std::vector<read_result> results,
const std::vector<op_context::response_placeholder_ptr>& responses) {
const std::vector<op_context::response_placeholder_ptr>& responses,
op_context::latency_point start_time,
bool record_latency = true) {
auto range = boost::irange<size_t>(0, results.size());
if (unlikely(results.size() != responses.size())) {
// soft assert & recovery attempt
Expand Down Expand Up @@ -518,6 +520,13 @@ static void fill_fetch_responses(
}

resp_it->set(std::move(resp));

if (record_latency) {
std::chrono::microseconds fetch_latency
= std::chrono::duration_cast<std::chrono::microseconds>(
op_context::latency_clock::now() - start_time);
octx.rctx.probe().record_fetch_latency(fetch_latency);
}
}
}

Expand Down Expand Up @@ -603,6 +612,73 @@ bool shard_fetch::empty() const {
return requests.empty();
}

/**
* Top-level handler for fetching from single shard. The result is
* unwrapped and any errors from the storage sub-system are translated
* into kafka specific response codes. On failure or success the
* partition response is finalized and placed into its position in the
* response message.
*/
static ss::future<>
handle_shard_fetch(ss::shard_id shard, op_context& octx, shard_fetch fetch) {
// if over budget skip the fetch.
if (octx.bytes_left <= 0) {
return ss::now();
}
// no requests for this shard, do nothing
if (fetch.empty()) {
return ss::now();
}

const bool foreign_read = shard != ss::this_shard_id();

// dispatch to remote core
return octx.rctx.partition_manager()
.invoke_on(
shard,
octx.ssg,
[foreign_read, configs = std::move(fetch.requests), &octx](
cluster::partition_manager& mgr) mutable {
// &octx is captured only to immediately use its accessors here so
// that there is a list of all objects accessed next to `invoke_on`.
// This is meant to help avoiding unintended cross shard access
return fetch_ntps_in_parallel(
mgr,
octx.rctx.server().local().get_replica_selector(),
std::move(configs),
foreign_read,
octx.deadline,
octx.bytes_left,
octx.rctx.server().local().memory(),
octx.rctx.server().local().memory_fetch_sem());
})
.then([responses = std::move(fetch.responses),
start_time = fetch.start_time,
&octx](std::vector<read_result> results) mutable {
fill_fetch_responses(octx, std::move(results), responses, start_time);
});
}

class parallel_fetch_plan_executor final : public fetch_plan_executor::impl {
ss::future<> execute_plan(op_context& octx, fetch_plan plan) final {
std::vector<ss::future<>> fetches;
fetches.reserve(ss::smp::count);

// start fetching from random shard to make sure that we fetch data from
// all the partition even if we reach fetch message size limit
const ss::shard_id start_shard_idx = random_generators::get_int(
ss::smp::count - 1);
for (size_t i = 0; i < ss::smp::count; ++i) {
auto shard = (start_shard_idx + i) % ss::smp::count;

fetches.push_back(handle_shard_fetch(
shard, octx, std::move(plan.fetches_per_shard[shard])));
}

return ss::when_all_succeed(fetches.begin(), fetches.end());
}
};

class fetch_worker {
public:
// Passed from the coordinator shard to fetch workers.
Expand Down Expand Up @@ -1084,7 +1160,11 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
});

fill_fetch_responses(
octx, std::move(results.read_results), fetch.responses);
octx,
std::move(results.read_results),
fetch.responses,
fetch.start_time,
false);

octx.rctx.probe().record_fetch_latency(
results.first_run_latency_result);
Expand Down Expand Up @@ -1285,6 +1365,49 @@ class simple_fetch_planner final : public fetch_planner::impl {
}
};

/**
* Process partition fetch requests.
*
* Each request is handled serially in the order they appear in the request.
* There are a couple reasons why we are not **yet** processing these in
* parallel. First, Kafka expects to some extent that the order of the
* partitions in the request is an implicit priority on which partitions to
* read from. This is closely related to the request budget limits specified
* in terms of maximum bytes and maximum time delay.
*
* Once we start processing requests in parallel we'll have to work through
* various challenges. First, once we dispatch in parallel, we'll need to
* develop heuristics for dealing with the implicit priority order. We'll
* also need to develop techniques and heuristics for dealing with budgets
* since global budgets aren't trivially divisible onto each core when
* partition requests may produce non-uniform amounts of data.
*
* w.r.t. what is needed to parallelize this, there are no data dependencies
* between partition requests within the fetch request, and so they can be
* run fully in parallel. The only dependency that exists is that the
* response must be reassembled such that the responses appear in these
* order as the partitions in the request.
*/
static ss::future<> fetch_topic_partitions(op_context& octx) {
auto planner = make_fetch_planner<simple_fetch_planner>();

auto fetch_plan = planner.create_plan(octx);

fetch_plan_executor executor
= make_fetch_plan_executor<parallel_fetch_plan_executor>();
co_await executor.execute_plan(octx, std::move(fetch_plan));

if (octx.should_stop_fetch()) {
co_return;
}

octx.reset_context();
// debounce next read retry
co_await ss::sleep(std::min(
config::shard_local_cfg().fetch_reads_debounce_timeout(),
octx.request.data.max_wait_ms));
}

namespace testing {
kafka::fetch_plan make_simple_fetch_plan(op_context& octx) {
auto planner = make_fetch_planner<simple_fetch_planner>();
Expand All @@ -1294,11 +1417,26 @@ kafka::fetch_plan make_simple_fetch_plan(op_context& octx) {

namespace {
ss::future<> do_fetch(op_context& octx) {
auto planner = make_fetch_planner<simple_fetch_planner>();
auto fetch_plan = planner.create_plan(octx);

nonpolling_fetch_plan_executor executor;
co_await executor.execute_plan(octx, std::move(fetch_plan));
switch (config::shard_local_cfg().fetch_read_strategy) {
case model::fetch_read_strategy::polling: {
// first fetch, do not wait
co_await fetch_topic_partitions(octx).then([&octx] {
return ss::do_until(
[&octx] { return octx.should_stop_fetch(); },
[&octx] { return fetch_topic_partitions(octx); });
});
} break;
case model::fetch_read_strategy::non_polling: {
auto planner = make_fetch_planner<simple_fetch_planner>();
auto fetch_plan = planner.create_plan(octx);

nonpolling_fetch_plan_executor executor;
co_await executor.execute_plan(octx, std::move(fetch_plan));
} break;
default: {
vassert(false, "not implemented");
} break;
}
}
} // namespace

Expand Down
10 changes: 8 additions & 2 deletions src/v/kafka/server/handlers/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ struct read_result {
// struct aggregating fetch requests and corresponding response iterators for
// the same shard
struct shard_fetch {
explicit shard_fetch(op_context::latency_point start_time)
: start_time{start_time} {}

void push_back(
ntp_fetch_config config, op_context::response_placeholder_ptr r_ph) {
requests.push_back(std::move(config));
Expand All @@ -346,6 +349,7 @@ struct shard_fetch {
ss::shard_id shard;
std::vector<ntp_fetch_config> requests;
std::vector<op_context::response_placeholder_ptr> responses;
op_context::latency_point start_time;

friend std::ostream& operator<<(std::ostream& o, const shard_fetch& sf) {
fmt::print(o, "{}", sf.requests);
Expand All @@ -354,8 +358,10 @@ struct shard_fetch {
};

struct fetch_plan {
explicit fetch_plan(size_t shards)
: fetches_per_shard(shards, shard_fetch()) {
explicit fetch_plan(
size_t shards,
op_context::latency_point start_time = op_context::latency_clock::now())
: fetches_per_shard(shards, shard_fetch(start_time)) {
for (size_t i = 0; i < fetches_per_shard.size(); i++) {
fetches_per_shard[i].shard = i;
}
Expand Down
19 changes: 19 additions & 0 deletions src/v/model/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,25 @@ operator<<(std::ostream& os, cloud_storage_chunk_eviction_strategy st) {
}
}

enum class fetch_read_strategy : uint8_t {
polling = 0,
non_polling = 1,
};

constexpr const char* fetch_read_strategy_to_string(fetch_read_strategy s) {
switch (s) {
case fetch_read_strategy::polling:
return "polling";
case fetch_read_strategy::non_polling:
return "non_polling";
default:
throw std::invalid_argument("unknown fetch_read_strategy");
}
}

std::ostream& operator<<(std::ostream&, fetch_read_strategy);
std::istream& operator>>(std::istream&, fetch_read_strategy&);

/**
* Type representing MPX virtual cluster. MPX uses XID to identify clusters.
*/
Expand Down
18 changes: 18 additions & 0 deletions src/v/model/model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,24 @@ std::ostream& operator<<(std::ostream& o, const batch_identity& bid) {
return o;
}

std::ostream& operator<<(std::ostream& o, fetch_read_strategy s) {
o << fetch_read_strategy_to_string(s);
return o;
}

std::istream& operator>>(std::istream& i, fetch_read_strategy& strat) {
ss::sstring s;
i >> s;
strat = string_switch<fetch_read_strategy>(s)
.match(
fetch_read_strategy_to_string(fetch_read_strategy::polling),
fetch_read_strategy::polling)
.match(
fetch_read_strategy_to_string(fetch_read_strategy::non_polling),
fetch_read_strategy::non_polling);
return i;
}

std::ostream& operator<<(std::ostream& o, write_caching_mode mode) {
o << write_caching_mode_to_string(mode);
return o;
Expand Down

0 comments on commit 60d0cdc

Please sign in to comment.