Skip to content

Commit

Permalink
treewide: remove old fetch impl and mark fetch_read_strategy as depre…
Browse files Browse the repository at this point in the history
…cated
  • Loading branch information
ballard26 committed Mar 19, 2024
1 parent 4ad77c3 commit c735156
Show file tree
Hide file tree
Showing 9 changed files with 7 additions and 233 deletions.
14 changes: 1 addition & 13 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -618,19 +618,7 @@ configuration::configuration()
"wasn't reached",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
1ms)
, fetch_read_strategy(
*this,
"fetch_read_strategy",
"The strategy used to fulfill fetch requests",
{.needs_restart = needs_restart::no,
.example = model::fetch_read_strategy_to_string(
model::fetch_read_strategy::non_polling),
.visibility = visibility::tunable},
model::fetch_read_strategy::non_polling,
{
model::fetch_read_strategy::polling,
model::fetch_read_strategy::non_polling,
})
, fetch_read_strategy(*this, "fetch_read_strategy")
, alter_topic_cfg_timeout_ms(
*this,
"alter_topic_cfg_timeout_ms",
Expand Down
2 changes: 1 addition & 1 deletion src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ struct configuration final : public config_store {
property<std::chrono::milliseconds> tx_timeout_delay_ms;
deprecated_property rm_violation_recovery_policy;
property<std::chrono::milliseconds> fetch_reads_debounce_timeout;
enum_property<model::fetch_read_strategy> fetch_read_strategy;
deprecated_property fetch_read_strategy;
property<std::chrono::milliseconds> alter_topic_cfg_timeout_ms;
property<model::cleanup_policy_bitflags> log_cleanup_policy;
enum_property<model::timestamp_type> log_message_timestamp_type;
Expand Down
31 changes: 0 additions & 31 deletions src/v/config/convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -500,37 +500,6 @@ struct convert<pandaproxy::schema_registry::schema_id_validation_mode> {
}
};

template<>
struct convert<model::fetch_read_strategy> {
using type = model::fetch_read_strategy;

static constexpr auto acceptable_values = std::to_array(
{model::fetch_read_strategy_to_string(type::polling),
model::fetch_read_strategy_to_string(type::non_polling)});

static Node encode(const type& rhs) { return Node(fmt::format("{}", rhs)); }

static bool decode(const Node& node, type& rhs) {
auto value = node.as<std::string>();

if (
std::find(acceptable_values.begin(), acceptable_values.end(), value)
== acceptable_values.end()) {
return false;
}

rhs = string_switch<type>(std::string_view{value})
.match(
model::fetch_read_strategy_to_string(type::polling),
type::polling)
.match(
model::fetch_read_strategy_to_string(type::non_polling),
type::non_polling);

return true;
}
};

template<>
struct convert<model::write_caching_mode> {
using type = model::write_caching_mode;
Expand Down
2 changes: 0 additions & 2 deletions src/v/config/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,6 @@ consteval std::string_view property_type_name() {
pandaproxy::schema_registry::
schema_id_validation_mode>) {
return "string";
} else if constexpr (std::is_same_v<type, model::fetch_read_strategy>) {
return "string";
} else if constexpr (std::is_same_v<type, model::write_caching_mode>) {
return "string";
} else {
Expand Down
5 changes: 0 additions & 5 deletions src/v/config/rjson_serialization.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,6 @@ void rjson_serialize(
stringize(w, v);
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::fetch_read_strategy& v) {
stringize(w, v);
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::write_caching_mode& v) {
stringize(w, v);
Expand Down
3 changes: 0 additions & 3 deletions src/v/config/rjson_serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ void rjson_serialize(
void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::leader_balancer_mode& v);

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::fetch_read_strategy& v);

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const model::write_caching_mode& v);

Expand Down
146 changes: 5 additions & 141 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -608,73 +608,6 @@ bool shard_fetch::empty() const {
return requests.empty();
}

/**
* Top-level handler for fetching from single shard. The result is
* unwrapped and any errors from the storage sub-system are translated
* into kafka specific response codes. On failure or success the
* partition response is finalized and placed into its position in the
* response message.
*/
static ss::future<>
handle_shard_fetch(ss::shard_id shard, op_context& octx, shard_fetch fetch) {
// if over budget skip the fetch.
if (octx.bytes_left <= 0) {
return ss::now();
}
// no requests for this shard, do nothing
if (fetch.empty()) {
return ss::now();
}

const bool foreign_read = shard != ss::this_shard_id();

// dispatch to remote core
return octx.rctx.partition_manager()
.invoke_on(
shard,
octx.ssg,
[foreign_read, configs = std::move(fetch.requests), &octx](
cluster::partition_manager& mgr) mutable {
// &octx is captured only to immediately use its accessors here so
// that there is a list of all objects accessed next to `invoke_on`.
// This is meant to help avoiding unintended cross shard access
return fetch_ntps_in_parallel(
mgr,
octx.rctx.server().local().get_replica_selector(),
std::move(configs),
foreign_read,
octx.deadline,
octx.bytes_left,
octx.rctx.server().local().memory(),
octx.rctx.server().local().memory_fetch_sem());
})
.then([responses = std::move(fetch.responses),
start_time = fetch.start_time,
&octx](std::vector<read_result> results) mutable {
fill_fetch_responses(octx, std::move(results), responses, start_time);
});
}

class parallel_fetch_plan_executor final : public fetch_plan_executor::impl {
ss::future<> execute_plan(op_context& octx, fetch_plan plan) final {
std::vector<ss::future<>> fetches;
fetches.reserve(ss::smp::count);

// start fetching from random shard to make sure that we fetch data from
// all the partition even if we reach fetch message size limit
const ss::shard_id start_shard_idx = random_generators::get_int(
ss::smp::count - 1);
for (size_t i = 0; i < ss::smp::count; ++i) {
auto shard = (start_shard_idx + i) % ss::smp::count;

fetches.push_back(handle_shard_fetch(
shard, octx, std::move(plan.fetches_per_shard[shard])));
}

return ss::when_all_succeed(fetches.begin(), fetches.end());
}
};

class fetch_worker {
public:
// Passed from the coordinator shard to fetch workers.
Expand Down Expand Up @@ -1348,60 +1281,6 @@ class simple_fetch_planner final : public fetch_planner::impl {
}
};

/**
* Process partition fetch requests.
*
* Each request is handled serially in the order they appear in the request.
* There are a couple reasons why we are not **yet** processing these in
* parallel. First, Kafka expects to some extent that the order of the
* partitions in the request is an implicit priority on which partitions to
* read from. This is closely related to the request budget limits specified
* in terms of maximum bytes and maximum time delay.
*
* Once we start processing requests in parallel we'll have to work through
* various challenges. First, once we dispatch in parallel, we'll need to
* develop heuristics for dealing with the implicit priority order. We'll
* also need to develop techniques and heuristics for dealing with budgets
* since global budgets aren't trivially divisible onto each core when
* partition requests may produce non-uniform amounts of data.
*
* w.r.t. what is needed to parallelize this, there are no data dependencies
* between partition requests within the fetch request, and so they can be
* run fully in parallel. The only dependency that exists is that the
* response must be reassembled such that the responses appear in these
* order as the partitions in the request.
*/
static ss::future<> fetch_topic_partitions(op_context& octx) {
auto bytes_left_before = octx.bytes_left;

auto start_time = latency_probe::hist_t::clock_type::now();
auto planner = make_fetch_planner<simple_fetch_planner>();

auto fetch_plan = planner.create_plan(octx);

fetch_plan_executor executor
= make_fetch_plan_executor<parallel_fetch_plan_executor>();
co_await executor.execute_plan(octx, std::move(fetch_plan));

auto end_time = latency_probe::hist_t::clock_type::now();

auto latency = std::chrono::duration_cast<std::chrono::microseconds>(
end_time - start_time);

octx.rctx.probe().record_fetch_plan_and_execute_measurement(
latency, bytes_left_before == octx.bytes_left);

if (octx.should_stop_fetch()) {
co_return;
}

octx.reset_context();
// debounce next read retry
co_await ss::sleep(std::min(
config::shard_local_cfg().fetch_reads_debounce_timeout(),
octx.request.data.max_wait_ms));
}

namespace testing {
kafka::fetch_plan make_simple_fetch_plan(op_context& octx) {
auto planner = make_fetch_planner<simple_fetch_planner>();
Expand All @@ -1411,26 +1290,11 @@ kafka::fetch_plan make_simple_fetch_plan(op_context& octx) {

namespace {
ss::future<> do_fetch(op_context& octx) {
switch (config::shard_local_cfg().fetch_read_strategy) {
case model::fetch_read_strategy::polling: {
// first fetch, do not wait
co_await fetch_topic_partitions(octx).then([&octx] {
return ss::do_until(
[&octx] { return octx.should_stop_fetch(); },
[&octx] { return fetch_topic_partitions(octx); });
});
} break;
case model::fetch_read_strategy::non_polling: {
auto planner = make_fetch_planner<simple_fetch_planner>();
auto fetch_plan = planner.create_plan(octx);

nonpolling_fetch_plan_executor executor;
co_await executor.execute_plan(octx, std::move(fetch_plan));
} break;
default: {
vassert(false, "not implemented");
} break;
}
auto planner = make_fetch_planner<simple_fetch_planner>();
auto fetch_plan = planner.create_plan(octx);

nonpolling_fetch_plan_executor executor;
co_await executor.execute_plan(octx, std::move(fetch_plan));
}
} // namespace

Expand Down
19 changes: 0 additions & 19 deletions src/v/model/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -544,25 +544,6 @@ operator<<(std::ostream& os, cloud_storage_chunk_eviction_strategy st) {
}
}

enum class fetch_read_strategy : uint8_t {
polling = 0,
non_polling = 1,
};

constexpr const char* fetch_read_strategy_to_string(fetch_read_strategy s) {
switch (s) {
case fetch_read_strategy::polling:
return "polling";
case fetch_read_strategy::non_polling:
return "non_polling";
default:
throw std::invalid_argument("unknown fetch_read_strategy");
}
}

std::ostream& operator<<(std::ostream&, fetch_read_strategy);
std::istream& operator>>(std::istream&, fetch_read_strategy&);

/**
* Type representing MPX virtual cluster. MPX uses XID to identify clusters.
*/
Expand Down
18 changes: 0 additions & 18 deletions src/v/model/model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -481,24 +481,6 @@ std::ostream& operator<<(std::ostream& o, const batch_identity& bid) {
return o;
}

std::ostream& operator<<(std::ostream& o, fetch_read_strategy s) {
o << fetch_read_strategy_to_string(s);
return o;
}

std::istream& operator>>(std::istream& i, fetch_read_strategy& strat) {
ss::sstring s;
i >> s;
strat = string_switch<fetch_read_strategy>(s)
.match(
fetch_read_strategy_to_string(fetch_read_strategy::polling),
fetch_read_strategy::polling)
.match(
fetch_read_strategy_to_string(fetch_read_strategy::non_polling),
fetch_read_strategy::non_polling);
return i;
}

std::ostream& operator<<(std::ostream& o, write_caching_mode mode) {
o << write_caching_mode_to_string(mode);
return o;
Expand Down

0 comments on commit c735156

Please sign in to comment.