Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k/protocol: Checking for tag value to be 0 #11443

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/kafka/protocol/errors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ std::string_view error_code_to_str(error_code error) {
case error_code::transactional_id_not_found:
return "transactional_id_not_found";
default:
std::terminate(); // make gcc happy
return "unknown_error_code";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logic here is we should crash because if we say we support a version of a certain kafka API and we observe an unknown error code, it means there's a bug in redpanda and the handler is implemented incorrectly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I guess that is debatable though, realistically perhaps crashing is too harsh.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I guess that is debatable though, realistically perhaps crashing is too harsh.

I feel like this is an easy DoS to run against RP

}
}

Expand Down
19 changes: 13 additions & 6 deletions src/v/kafka/protocol/schemata/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1335,21 +1335,28 @@ class response;
{%- endif %}
{%- endmacro %}

{% macro conditional_tag_encode(tdef, vec) %}
{% macro conditional_tag_encode(tdef, vec, obj = "") %}
{%- if obj %}
{%- set fname = obj + "." + tdef.name %}
{%- else %}
{%- set fname = tdef.name %}
{%- endif %}
{%- if tdef.nullable() %}
if ({{ tdef.name }}) {
if ({{ fname }}) {
{{ vec }}.push_back({{ tdef.tag() }});
}
{%- elif tdef.is_array %}
if (!{{ tdef.name }}.empty()) {
if (!{{ fname }}.empty()) {
{{ vec }}.push_back({{ tdef.tag() }});
}
{%- elif tdef.default_value() != "" %}
if ({{ tdef.name }} != {{ tdef.default_value() }}) {
if ({{ fname }} != {{ tdef.default_value() }}) {
michael-redpanda marked this conversation as resolved.
Show resolved Hide resolved
{{ vec }}.push_back({{ tdef.tag() }});
}
{%- else %}
{{ vec }}.push_back({{ tdef.tag() }});
if ({{ fname }} != {{ tdef.type_name.0 }}{0}) {
{{ vec }}.push_back({{ tdef.tag() }});
}
{%- endif %}
{%- endmacro %}

Expand All @@ -1358,7 +1365,7 @@ class response;
std::vector<uint32_t> to_encode;
{%- for tdef in tag_definitions -%}
{%- call tag_version_guard(tdef) %}
{{- conditional_tag_encode(tdef, "to_encode") }}
{{- conditional_tag_encode(tdef, "to_encode", obj) }}
{%- endcall %}
{%- endfor %}
{%- set tf = "unknown_tags" %}
Expand Down
137 changes: 137 additions & 0 deletions src/v/kafka/protocol/tests/protocol_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,145 @@ void check_all_requests(kafka::type_list<Ts...>) {
(check_proto_compat<Ts>(), ...);
}

template<typename Request, api_version::type version, bool is_request>
requires(KafkaApiHandler<Request>)
struct tag_field_entry {
using api = Request::api;
static constexpr api_version test_version = api_version(version);
static constexpr bool request = is_request;
};

// Instructions on adding optional tag field messages
// When a new kafka message is added that contains optional tag fields,
// you will do the following:
// 1. Add the entry to `tag_field_entries`, providing
// a. The handler
// b. minimum api version
// c. true for request, false for response (needs to be bool for constexpr)
// 2. Create a specialized `create_default_and_non_default_data` function
// a. Have it set non-default values and default values for each
// b. Provide the difference in size the encoded buffers will be

using tag_field_entries = kafka::type_list<
tag_field_entry<create_topics_handler, 5, false>,
tag_field_entry<api_versions_handler, 3, false>>;

template<typename T>
long create_default_and_non_default_data(T& non_default_data, T& default_data);

template<>
long create_default_and_non_default_data(
decltype(create_topics_response::data)& non_default_data,
decltype(create_topics_response::data)& default_data) {
non_default_data.throttle_time_ms = std::chrono::milliseconds(1000);

non_default_data.topics.emplace_back(creatable_topic_result{
model::topic{"topic1"},
{},
kafka::error_code{1},
"test_error_message",
3,
16,
std::nullopt,
kafka::error_code{2}});

default_data = non_default_data;

default_data.topics.at(0).topic_config_error_code = kafka::error_code{0};

// int16 (2 bytes) + tag (2 bytes)
return 4;
}

template<>
long create_default_and_non_default_data(
decltype(api_versions_response::data)& non_default_data,
decltype(api_versions_response::data)& default_data) {
non_default_data.finalized_features_epoch = 0;
default_data = non_default_data;
default_data.finalized_features_epoch = -1;

// int64 (8 bytes) + tag (2 bytes)
return 10;
michael-redpanda marked this conversation as resolved.
Show resolved Hide resolved
}

template<typename T>
bool validate_buffer_against_data(
const T& check_data, api_version version, const bytes& buffer) {
T r;
if constexpr (HasPrimitiveDecode<decltype(r)>) {
r.decode(bytes_to_iobuf(buffer), version);
} else {
kafka::protocol::decoder rdr(bytes_to_iobuf(buffer));
r.decode(rdr, version);
}

return r == check_data;
}

template<typename T>
void check_kafka_tag_format(api_version version, bool is_request) {
decltype(T::data) non_default_data{};
decltype(T::data) default_data{};

auto size_diff = create_default_and_non_default_data(
non_default_data, default_data);

BOOST_REQUIRE_NE(non_default_data, default_data);

bytes non_default_encoded, default_encoded;

{
iobuf iob;
kafka::protocol::encoder rw(iob);
non_default_data.encode(rw, version);
non_default_encoded = iobuf_to_bytes(iob);
}

{
iobuf iob;
kafka::protocol::encoder rw(iob);
default_data.encode(rw, version);
default_encoded = iobuf_to_bytes(iob);
}

BOOST_CHECK_EQUAL(
non_default_encoded.size() - default_encoded.size(), size_diff);

BOOST_TEST_CHECK(validate_buffer_against_data(
non_default_data, version, non_default_encoded));
BOOST_TEST_CHECK(
michael-redpanda marked this conversation as resolved.
Show resolved Hide resolved
validate_buffer_against_data(default_data, version, default_encoded));
}

template<typename T>
concept FieldEntry = kafka::KafkaApi<typename T::api> && requires(T t) {
{ T::test_version } -> std::convertible_to<const api_version&>;
{ T::request } -> std::convertible_to<const bool>;
};

template<FieldEntry T>
void check_tag_request() {
if constexpr (T::request) {
check_kafka_tag_format<typename T::api::request_type>(
T::test_version, T::request);
} else {
check_kafka_tag_format<typename T::api::response_type>(
T::test_version, T::request);
}
}

template<typename... Ts>
void check_all_tag_requests(kafka::type_list<Ts...>) {
(check_tag_request<Ts>(), ...);
}

BOOST_AUTO_TEST_CASE(test_kafka_protocol_compat) {
check_all_requests(kafka::request_types{});
}

BOOST_AUTO_TEST_CASE(test_optional_tag_values) {
check_all_tag_requests(tag_field_entries{});
}

} // namespace kafka