Skip to content

Commit

Permalink
Revert "treewide: remove old fetch impl and mark fetch_read_strategy …
Browse files Browse the repository at this point in the history
…as deprecated"

This reverts commit c735156.

There is scenarios where polling with high debounce can be used to limit
fetch perf impact (at the cost of high latency).

(cherry picked from commit 03a9f27)
  • Loading branch information
StephanDollberg authored and vbotbuildovich committed Apr 30, 2024
1 parent ccf9636 commit 568c78f
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 7 deletions.
14 changes: 13 additions & 1 deletion src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,19 @@ configuration::configuration()
"wasn't reached",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
1ms)
, fetch_read_strategy(*this, "fetch_read_strategy")
, fetch_read_strategy(
*this,
"fetch_read_strategy",
"The strategy used to fulfill fetch requests",
{.needs_restart = needs_restart::no,
.example = model::fetch_read_strategy_to_string(
model::fetch_read_strategy::non_polling),
.visibility = visibility::tunable},
model::fetch_read_strategy::non_polling,
{
model::fetch_read_strategy::polling,
model::fetch_read_strategy::non_polling,
})
, alter_topic_cfg_timeout_ms(
*this,
"alter_topic_cfg_timeout_ms",
Expand Down
2 changes: 1 addition & 1 deletion src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ struct configuration final : public config_store {
property<std::chrono::milliseconds> tx_timeout_delay_ms;
deprecated_property rm_violation_recovery_policy;
property<std::chrono::milliseconds> fetch_reads_debounce_timeout;
deprecated_property fetch_read_strategy;
enum_property<model::fetch_read_strategy> fetch_read_strategy;
property<std::chrono::milliseconds> alter_topic_cfg_timeout_ms;
property<model::cleanup_policy_bitflags> log_cleanup_policy;
enum_property<model::timestamp_type> log_message_timestamp_type;
Expand Down
31 changes: 31 additions & 0 deletions src/v/config/convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,37 @@ struct convert<pandaproxy::schema_registry::schema_id_validation_mode> {
}
};

template<>
struct convert<model::fetch_read_strategy> {
using type = model::fetch_read_strategy;

static constexpr auto acceptable_values = std::to_array(
{model::fetch_read_strategy_to_string(type::polling),
model::fetch_read_strategy_to_string(type::non_polling)});

static Node encode(const type& rhs) { return Node(fmt::format("{}", rhs)); }

static bool decode(const Node& node, type& rhs) {
auto value = node.as<std::string>();

if (
std::find(acceptable_values.begin(), acceptable_values.end(), value)
== acceptable_values.end()) {
return false;
}

rhs = string_switch<type>(std::string_view{value})
.match(
model::fetch_read_strategy_to_string(type::polling),
type::polling)
.match(
model::fetch_read_strategy_to_string(type::non_polling),
type::non_polling);

return true;
}
};

template<>
struct convert<model::write_caching_mode> {
using type = model::write_caching_mode;
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,8 @@ consteval std::string_view property_type_name() {
pandaproxy::schema_registry::
schema_id_validation_mode>) {
return "string";
} else if constexpr (std::is_same_v<type, model::fetch_read_strategy>) {
return "string";
} else if constexpr (std::is_same_v<type, model::write_caching_mode>) {
return "string";
} else if constexpr (std::
Expand Down
5 changes: 5 additions & 0 deletions src/v/config/rjson_serialization.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ void rjson_serialize(
stringize(w, v);
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::fetch_read_strategy& v) {
stringize(w, v);
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::write_caching_mode& v) {
stringize(w, v);
Expand Down
3 changes: 3 additions & 0 deletions src/v/config/rjson_serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ void rjson_serialize(
void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::leader_balancer_mode& v);

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::fetch_read_strategy& v);

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::write_caching_mode& v);

Expand Down
135 changes: 130 additions & 5 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,73 @@ bool shard_fetch::empty() const {
return requests.empty();
}

/**
* Top-level handler for fetching from single shard. The result is
* unwrapped and any errors from the storage sub-system are translated
* into kafka specific response codes. On failure or success the
* partition response is finalized and placed into its position in the
* response message.
*/
static ss::future<>
handle_shard_fetch(ss::shard_id shard, op_context& octx, shard_fetch fetch) {
// if over budget skip the fetch.
if (octx.bytes_left <= 0) {
return ss::now();
}
// no requests for this shard, do nothing
if (fetch.empty()) {
return ss::now();
}

const bool foreign_read = shard != ss::this_shard_id();

// dispatch to remote core
return octx.rctx.partition_manager()
.invoke_on(
shard,
octx.ssg,
[foreign_read, configs = std::move(fetch.requests), &octx](
cluster::partition_manager& mgr) mutable {
// &octx is captured only to immediately use its accessors here so
// that there is a list of all objects accessed next to `invoke_on`.
// This is meant to help avoiding unintended cross shard access
return fetch_ntps_in_parallel(
mgr,
octx.rctx.server().local().get_replica_selector(),
std::move(configs),
foreign_read,
octx.deadline,
octx.bytes_left,
octx.rctx.server().local().memory(),
octx.rctx.server().local().memory_fetch_sem());
})
.then([responses = std::move(fetch.responses),
start_time = fetch.start_time,
&octx](std::vector<read_result> results) mutable {
fill_fetch_responses(octx, std::move(results), responses, start_time);
});
}

class parallel_fetch_plan_executor final : public fetch_plan_executor::impl {
ss::future<> execute_plan(op_context& octx, fetch_plan plan) final {
std::vector<ss::future<>> fetches;
fetches.reserve(ss::smp::count);

// start fetching from random shard to make sure that we fetch data from
// all the partition even if we reach fetch message size limit
const ss::shard_id start_shard_idx = random_generators::get_int(
ss::smp::count - 1);
for (size_t i = 0; i < ss::smp::count; ++i) {
auto shard = (start_shard_idx + i) % ss::smp::count;

fetches.push_back(handle_shard_fetch(
shard, octx, std::move(plan.fetches_per_shard[shard])));
}

return ss::when_all_succeed(fetches.begin(), fetches.end());
}
};

class fetch_worker {
public:
// Passed from the coordinator shard to fetch workers.
Expand Down Expand Up @@ -1285,6 +1352,49 @@ class simple_fetch_planner final : public fetch_planner::impl {
}
};

/**
* Process partition fetch requests.
*
* Each request is handled serially in the order they appear in the request.
* There are a couple reasons why we are not **yet** processing these in
* parallel. First, Kafka expects to some extent that the order of the
* partitions in the request is an implicit priority on which partitions to
* read from. This is closely related to the request budget limits specified
* in terms of maximum bytes and maximum time delay.
*
* Once we start processing requests in parallel we'll have to work through
* various challenges. First, once we dispatch in parallel, we'll need to
* develop heuristics for dealing with the implicit priority order. We'll
* also need to develop techniques and heuristics for dealing with budgets
* since global budgets aren't trivially divisible onto each core when
* partition requests may produce non-uniform amounts of data.
*
* w.r.t. what is needed to parallelize this, there are no data dependencies
* between partition requests within the fetch request, and so they can be
* run fully in parallel. The only dependency that exists is that the
* response must be reassembled such that the responses appear in these
* order as the partitions in the request.
*/
static ss::future<> fetch_topic_partitions(op_context& octx) {
auto planner = make_fetch_planner<simple_fetch_planner>();

auto fetch_plan = planner.create_plan(octx);

fetch_plan_executor executor
= make_fetch_plan_executor<parallel_fetch_plan_executor>();
co_await executor.execute_plan(octx, std::move(fetch_plan));

if (octx.should_stop_fetch()) {
co_return;
}

octx.reset_context();
// debounce next read retry
co_await ss::sleep(std::min(
config::shard_local_cfg().fetch_reads_debounce_timeout(),
octx.request.data.max_wait_ms));
}

namespace testing {
kafka::fetch_plan make_simple_fetch_plan(op_context& octx) {
auto planner = make_fetch_planner<simple_fetch_planner>();
Expand All @@ -1294,11 +1404,26 @@ kafka::fetch_plan make_simple_fetch_plan(op_context& octx) {

namespace {
ss::future<> do_fetch(op_context& octx) {
auto planner = make_fetch_planner<simple_fetch_planner>();
auto fetch_plan = planner.create_plan(octx);

nonpolling_fetch_plan_executor executor;
co_await executor.execute_plan(octx, std::move(fetch_plan));
switch (config::shard_local_cfg().fetch_read_strategy) {
case model::fetch_read_strategy::polling: {
// first fetch, do not wait
co_await fetch_topic_partitions(octx).then([&octx] {
return ss::do_until(
[&octx] { return octx.should_stop_fetch(); },
[&octx] { return fetch_topic_partitions(octx); });
});
} break;
case model::fetch_read_strategy::non_polling: {
auto planner = make_fetch_planner<simple_fetch_planner>();
auto fetch_plan = planner.create_plan(octx);

nonpolling_fetch_plan_executor executor;
co_await executor.execute_plan(octx, std::move(fetch_plan));
} break;
default: {
vassert(false, "not implemented");
} break;
}
}
} // namespace

Expand Down
19 changes: 19 additions & 0 deletions src/v/model/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,25 @@ operator<<(std::ostream& os, cloud_storage_chunk_eviction_strategy st) {
}
}

enum class fetch_read_strategy : uint8_t {
polling = 0,
non_polling = 1,
};

constexpr const char* fetch_read_strategy_to_string(fetch_read_strategy s) {
switch (s) {
case fetch_read_strategy::polling:
return "polling";
case fetch_read_strategy::non_polling:
return "non_polling";
default:
throw std::invalid_argument("unknown fetch_read_strategy");
}
}

std::ostream& operator<<(std::ostream&, fetch_read_strategy);
std::istream& operator>>(std::istream&, fetch_read_strategy&);

/**
* Type representing MPX virtual cluster. MPX uses XID to identify clusters.
*/
Expand Down
18 changes: 18 additions & 0 deletions src/v/model/model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,24 @@ std::ostream& operator<<(std::ostream& o, const batch_identity& bid) {
return o;
}

std::ostream& operator<<(std::ostream& o, fetch_read_strategy s) {
o << fetch_read_strategy_to_string(s);
return o;
}

std::istream& operator>>(std::istream& i, fetch_read_strategy& strat) {
ss::sstring s;
i >> s;
strat = string_switch<fetch_read_strategy>(s)
.match(
fetch_read_strategy_to_string(fetch_read_strategy::polling),
fetch_read_strategy::polling)
.match(
fetch_read_strategy_to_string(fetch_read_strategy::non_polling),
fetch_read_strategy::non_polling);
return i;
}

std::ostream& operator<<(std::ostream& o, write_caching_mode mode) {
o << write_caching_mode_to_string(mode);
return o;
Expand Down

0 comments on commit 568c78f

Please sign in to comment.