From 31a536636c0124013898228344c3ddd3600ccef8 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Tue, 11 Jan 2022 22:24:12 +0000 Subject: [PATCH] fix: skip over KIP-482 tagged fields The decoder would error out with "non-empty tagged fields are not supported yet" if it encountered a non-zero length for the tagged fields array. Handle these more graciously (as per the KIP) by skipping over them without deserializing. This fixes an issue with the v3 ApiVersionsResponse on Kafka 2.7.0 and newer which now return the `SupportedFeatures` tagged field on the response (KAFKA-10028). Signed-off-by: Dominic Evans --- api_versions_response_test.go | 2 +- encoder_decoder.go | 4 +++- real_decoder.go | 30 +++++++++++++++++++++--------- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/api_versions_response_test.go b/api_versions_response_test.go index 0baaeb404..26e0bf2a4 100644 --- a/api_versions_response_test.go +++ b/api_versions_response_test.go @@ -19,7 +19,7 @@ var ( 0x00, 0x01, 0x00, // tagged fields 0x00, 0x00, 0x00, 0x00, // throttle time - 0x00, // tagged fields + 0x01, 0x01, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // tagged fields (empty SupportedFeatures) } ) diff --git a/encoder_decoder.go b/encoder_decoder.go index dab54f88c..4aab4d21d 100644 --- a/encoder_decoder.go +++ b/encoder_decoder.go @@ -87,7 +87,9 @@ func versionedDecode(buf []byte, in versionedDecoder, version int16) error { } if helper.off != len(buf) { - return PacketDecodingError{"invalid length"} + return PacketDecodingError{ + Info: fmt.Sprintf("invalid length (off=%d, len=%d)", helper.off, len(buf)), + } } return nil diff --git a/real_decoder.go b/real_decoder.go index bab6cb486..a5b54ebb3 100644 --- a/real_decoder.go +++ b/real_decoder.go @@ -6,13 +6,12 @@ import ( ) var ( - errInvalidArrayLength = PacketDecodingError{"invalid array length"} - errInvalidByteSliceLength = PacketDecodingError{"invalid byteslice length"} - errInvalidStringLength = PacketDecodingError{"invalid string length"} - errVarintOverflow = PacketDecodingError{"varint overflow"} - errUVarintOverflow = PacketDecodingError{"uvarint overflow"} - errInvalidBool = PacketDecodingError{"invalid bool"} - errUnsupportedTaggedFields = PacketDecodingError{"non-empty tagged fields are not supported yet"} + errInvalidArrayLength = PacketDecodingError{"invalid array length"} + errInvalidByteSliceLength = PacketDecodingError{"invalid byteslice length"} + errInvalidStringLength = PacketDecodingError{"invalid string length"} + errVarintOverflow = PacketDecodingError{"varint overflow"} + errUVarintOverflow = PacketDecodingError{"uvarint overflow"} + errInvalidBool = PacketDecodingError{"invalid bool"} ) type realDecoder struct { @@ -149,8 +148,21 @@ func (rd *realDecoder) getEmptyTaggedFieldArray() (int, error) { return 0, err } - if tagCount != 0 { - return 0, errUnsupportedTaggedFields + // skip over any tagged fields without deserializing them + // as we don't currently support doing anything with them + for i := uint64(0); i < tagCount; i++ { + // fetch and ignore tag identifier + _, err := rd.getUVarint() + if err != nil { + return 0, err + } + length, err := rd.getUVarint() + if err != nil { + return 0, err + } + if _, err := rd.getRawBytes(int(length)); err != nil { + return 0, err + } } return 0, nil