Skip to content

Commit

Permalink
Merge pull request #11443 from michael-redpanda/dont-encode-zero-valu…
Browse files Browse the repository at this point in the history
…e-flex-variables

k/protocol: Checking for tag value to be 0
  • Loading branch information
piyushredpanda authored Jun 23, 2023
2 parents 9ca614b + e77a935 commit 957eab6
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/v/kafka/protocol/errors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ std::string_view error_code_to_str(error_code error) {
case error_code::transactional_id_not_found:
return "transactional_id_not_found";
default:
std::terminate(); // make gcc happy
return "unknown_error_code";
}
}

Expand Down
19 changes: 13 additions & 6 deletions src/v/kafka/protocol/schemata/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1358,21 +1358,28 @@ class response;
{%- endif %}
{%- endmacro %}
{% macro conditional_tag_encode(tdef, vec) %}
{% macro conditional_tag_encode(tdef, vec, obj = "") %}
{%- if obj %}
{%- set fname = obj + "." + tdef.name %}
{%- else %}
{%- set fname = tdef.name %}
{%- endif %}
{%- if tdef.nullable() %}
if ({{ tdef.name }}) {
if ({{ fname }}) {
{{ vec }}.push_back({{ tdef.tag() }});
}
{%- elif tdef.is_array %}
if (!{{ tdef.name }}.empty()) {
if (!{{ fname }}.empty()) {
{{ vec }}.push_back({{ tdef.tag() }});
}
{%- elif tdef.default_value() != "" %}
if ({{ tdef.name }} != {{ tdef.default_value() }}) {
if ({{ fname }} != {{ tdef.default_value() }}) {
{{ vec }}.push_back({{ tdef.tag() }});
}
{%- else %}
{{ vec }}.push_back({{ tdef.tag() }});
if ({{ fname }} != {{ tdef.type_name.0 }}{0}) {
{{ vec }}.push_back({{ tdef.tag() }});
}
{%- endif %}
{%- endmacro %}
Expand All @@ -1381,7 +1388,7 @@ class response;
std::vector<uint32_t> to_encode;
{%- for tdef in tag_definitions -%}
{%- call tag_version_guard(tdef) %}
{{- conditional_tag_encode(tdef, "to_encode") }}
{{- conditional_tag_encode(tdef, "to_encode", obj) }}
{%- endcall %}
{%- endfor %}
{%- set tf = "unknown_tags" %}
Expand Down
137 changes: 137 additions & 0 deletions src/v/kafka/protocol/tests/protocol_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,145 @@ void check_all_requests(kafka::type_list<Ts...>) {
(check_proto_compat<Ts>(), ...);
}

template<typename Request, api_version::type version, bool is_request>
requires(KafkaApiHandler<Request>)
struct tag_field_entry {
using api = Request::api;
static constexpr api_version test_version = api_version(version);
static constexpr bool request = is_request;
};

// Instructions on adding optional tag field messages
// When a new kafka message is added that contains optional tag fields,
// you will do the following:
// 1. Add the entry to `tag_field_entries`, providing
// a. The handler
// b. minimum api version
// c. true for request, false for response (needs to be bool for constexpr)
// 2. Create a specialized `create_default_and_non_default_data` function
// a. Have it set non-default values and default values for each
// b. Provide the difference in size the encoded buffers will be

using tag_field_entries = kafka::type_list<
tag_field_entry<create_topics_handler, 5, false>,
tag_field_entry<api_versions_handler, 3, false>>;

template<typename T>
long create_default_and_non_default_data(T& non_default_data, T& default_data);

template<>
long create_default_and_non_default_data(
decltype(create_topics_response::data)& non_default_data,
decltype(create_topics_response::data)& default_data) {
non_default_data.throttle_time_ms = std::chrono::milliseconds(1000);

non_default_data.topics.emplace_back(creatable_topic_result{
model::topic{"topic1"},
{},
kafka::error_code{1},
"test_error_message",
3,
16,
std::nullopt,
kafka::error_code{2}});

default_data = non_default_data;

default_data.topics.at(0).topic_config_error_code = kafka::error_code{0};

// int16 (2 bytes) + tag (2 bytes)
return 4;
}

template<>
long create_default_and_non_default_data(
decltype(api_versions_response::data)& non_default_data,
decltype(api_versions_response::data)& default_data) {
non_default_data.finalized_features_epoch = 0;
default_data = non_default_data;
default_data.finalized_features_epoch = -1;

// int64 (8 bytes) + tag (2 bytes)
return 10;
}

template<typename T>
bool validate_buffer_against_data(
const T& check_data, api_version version, const bytes& buffer) {
T r;
if constexpr (HasPrimitiveDecode<decltype(r)>) {
r.decode(bytes_to_iobuf(buffer), version);
} else {
kafka::protocol::decoder rdr(bytes_to_iobuf(buffer));
r.decode(rdr, version);
}

return r == check_data;
}

template<typename T>
void check_kafka_tag_format(api_version version, bool is_request) {
decltype(T::data) non_default_data{};
decltype(T::data) default_data{};

auto size_diff = create_default_and_non_default_data(
non_default_data, default_data);

BOOST_REQUIRE_NE(non_default_data, default_data);

bytes non_default_encoded, default_encoded;

{
iobuf iob;
kafka::protocol::encoder rw(iob);
non_default_data.encode(rw, version);
non_default_encoded = iobuf_to_bytes(iob);
}

{
iobuf iob;
kafka::protocol::encoder rw(iob);
default_data.encode(rw, version);
default_encoded = iobuf_to_bytes(iob);
}

BOOST_CHECK_EQUAL(
non_default_encoded.size() - default_encoded.size(), size_diff);

BOOST_TEST_CHECK(validate_buffer_against_data(
non_default_data, version, non_default_encoded));
BOOST_TEST_CHECK(
validate_buffer_against_data(default_data, version, default_encoded));
}

template<typename T>
concept FieldEntry = kafka::KafkaApi<typename T::api> && requires(T t) {
{ T::test_version } -> std::convertible_to<const api_version&>;
{ T::request } -> std::convertible_to<const bool>;
};

template<FieldEntry T>
void check_tag_request() {
if constexpr (T::request) {
check_kafka_tag_format<typename T::api::request_type>(
T::test_version, T::request);
} else {
check_kafka_tag_format<typename T::api::response_type>(
T::test_version, T::request);
}
}

template<typename... Ts>
void check_all_tag_requests(kafka::type_list<Ts...>) {
(check_tag_request<Ts>(), ...);
}

BOOST_AUTO_TEST_CASE(test_kafka_protocol_compat) {
check_all_requests(kafka::request_types{});
}

BOOST_AUTO_TEST_CASE(test_optional_tag_values) {
check_all_tag_requests(tag_field_entries{});
}

} // namespace kafka

0 comments on commit 957eab6

Please sign in to comment.