From 096182c72e69fa32e7d99d5e0eb44437d65b330c Mon Sep 17 00:00:00 2001 From: Adrian Preston Date: Mon, 7 Aug 2023 11:26:24 +0100 Subject: [PATCH] feat(proto): support for Metadata V6-V10 In particular, this adds support for topic UUIDs Signed-off-by: Adrian Preston --- broker.go | 41 +++++-- metadata_request.go | 166 +++++++++++++++++++++---- metadata_request_test.go | 122 +++++++++++++++++++ metadata_response.go | 246 ++++++++++++++++++++++++++++++++------ metadata_response_test.go | 235 ++++++++++++++++++++++++++++++++++++ 5 files changed, 745 insertions(+), 65 deletions(-) diff --git a/broker.go b/broker.go index 9a6297347e..ff8cc81252 100644 --- a/broker.go +++ b/broker.go @@ -371,6 +371,7 @@ func (b *Broker) Rack() string { // GetMetadata send a metadata request and returns a metadata response or error func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) { response := new(MetadataResponse) + response.Version = request.Version // Required to ensure use of the correct response header version err := b.sendAndReceive(request, response) if err != nil { @@ -1072,7 +1073,12 @@ func (b *Broker) decode(pd packetDecoder, version int16) (err error) { return err } - host, err := pd.getString() + var host string + if version < 9 { + host, err = pd.getString() + } else { + host, err = pd.getCompactString() + } if err != nil { return err } @@ -1082,11 +1088,13 @@ func (b *Broker) decode(pd packetDecoder, version int16) (err error) { return err } - if version >= 1 { + if version >= 1 && version < 9 { b.rack, err = pd.getNullableString() - if err != nil { - return err - } + } else if version >= 9 { + b.rack, err = pd.getCompactNullableString() + } + if err != nil { + return err } b.addr = net.JoinHostPort(host, fmt.Sprint(port)) @@ -1094,6 +1102,13 @@ func (b *Broker) decode(pd packetDecoder, version int16) (err error) { return err } + if version >= 9 { + _, err := pd.getEmptyTaggedFieldArray() + if err != nil { + return err + } + } + return nil } @@ -1110,7 +1125,11 @@ func (b *Broker) encode(pe packetEncoder, version int16) (err error) { pe.putInt32(b.id) - err = pe.putString(host) + if version < 9 { + err = pe.putString(host) + } else { + err = pe.putCompactString(host) + } if err != nil { return err } @@ -1118,12 +1137,20 @@ func (b *Broker) encode(pe packetEncoder, version int16) (err error) { pe.putInt32(int32(port)) if version >= 1 { - err = pe.putNullableString(b.rack) + if version < 9 { + err = pe.putNullableString(b.rack) + } else { + err = pe.putNullableCompactString(b.rack) + } if err != nil { return err } } + if version >= 9 { + pe.putEmptyTaggedFieldArray() + } + return nil } diff --git a/metadata_request.go b/metadata_request.go index 9e1e61b171..e76073ea0d 100644 --- a/metadata_request.go +++ b/metadata_request.go @@ -1,17 +1,35 @@ package sarama +import "encoding/base64" + +type Uuid [16]byte + +func (u Uuid) String() string { + return base64.URLEncoding.WithPadding(base64.NoPadding).EncodeToString(u[:]) +} + +var NullUUID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + type MetadataRequest struct { // Version defines the protocol version to use for encode and decode Version int16 // Topics contains the topics to fetch metadata for. Topics []string // AllowAutoTopicCreation contains a If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so. - AllowAutoTopicCreation bool + AllowAutoTopicCreation bool + IncludeClusterAuthorizedOperations bool // version 8 and up + IncludeTopicAuthorizedOperations bool // version 8 and up } func NewMetadataRequest(version KafkaVersion, topics []string) *MetadataRequest { m := &MetadataRequest{Topics: topics} - if version.IsAtLeast(V2_1_0_0) { + if version.IsAtLeast(V2_8_0_0) { + m.Version = 10 + } else if version.IsAtLeast(V2_4_0_0) { + m.Version = 9 + } else if version.IsAtLeast(V2_4_0_0) { + m.Version = 8 + } else if version.IsAtLeast(V2_1_0_0) { m.Version = 7 } else if version.IsAtLeast(V2_0_0_0) { m.Version = 6 @@ -28,46 +46,124 @@ func NewMetadataRequest(version KafkaVersion, topics []string) *MetadataRequest } func (r *MetadataRequest) encode(pe packetEncoder) (err error) { - if r.Version < 0 || r.Version > 7 { + if r.Version < 0 || r.Version > 10 { return PacketEncodingError{"invalid or unsupported MetadataRequest version field"} } if r.Version == 0 || len(r.Topics) > 0 { - err := pe.putArrayLength(len(r.Topics)) - if err != nil { - return err - } - - for i := range r.Topics { - err = pe.putString(r.Topics[i]) + if r.Version < 9 { + err := pe.putArrayLength(len(r.Topics)) if err != nil { return err } + + for i := range r.Topics { + err = pe.putString(r.Topics[i]) + if err != nil { + return err + } + } + } else if r.Version == 9 { + pe.putCompactArrayLength(len(r.Topics)) + for _, topicName := range r.Topics { + if err := pe.putCompactString(topicName); err != nil { + return err + } + pe.putEmptyTaggedFieldArray() + } + } else { // r.Version = 10 + pe.putCompactArrayLength(len(r.Topics)) + for _, topicName := range r.Topics { + if err := pe.putRawBytes(NullUUID); err != nil { + return err + } + // Avoid implicit memory aliasing in for loop + tn := topicName + if err := pe.putNullableCompactString(&tn); err != nil { + return err + } + pe.putEmptyTaggedFieldArray() + } } } else { - pe.putInt32(-1) + if r.Version < 9 { + pe.putInt32(-1) + } else { + pe.putCompactArrayLength(-1) + } } - if r.Version >= 4 { + if r.Version > 3 { pe.putBool(r.AllowAutoTopicCreation) } - + if r.Version > 7 { + pe.putBool(r.IncludeClusterAuthorizedOperations) + pe.putBool(r.IncludeTopicAuthorizedOperations) + } + if r.Version > 8 { + pe.putEmptyTaggedFieldArray() + } return nil } func (r *MetadataRequest) decode(pd packetDecoder, version int16) (err error) { r.Version = version - size, err := pd.getInt32() - if err != nil { - return err - } - if size > 0 { - r.Topics = make([]string, size) + if r.Version < 9 { + size, err := pd.getInt32() + if err != nil { + return err + } + if size > 0 { + r.Topics = make([]string, size) + for i := range r.Topics { + topic, err := pd.getString() + if err != nil { + return err + } + r.Topics[i] = topic + } + } + } else if r.Version == 9 { + size, err := pd.getCompactArrayLength() + if err != nil { + return err + } + if size > 0 { + r.Topics = make([]string, size) + } for i := range r.Topics { - topic, err := pd.getString() + topic, err := pd.getCompactString() if err != nil { return err } r.Topics[i] = topic + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + } else { // version 10+ + size, err := pd.getCompactArrayLength() + if err != nil { + return err + } + + if size > 0 { + r.Topics = make([]string, size) + } + for i := range r.Topics { + if _, err = pd.getRawBytes(16); err != nil { // skip UUID + return err + } + topic, err := pd.getCompactNullableString() + if err != nil { + return err + } + if topic != nil { + r.Topics[i] = *topic + } + + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } } } @@ -77,6 +173,23 @@ func (r *MetadataRequest) decode(pd packetDecoder, version int16) (err error) { } } + if r.Version > 7 { + includeClusterAuthz, err := pd.getBool() + if err != nil { + return err + } + r.IncludeClusterAuthorizedOperations = includeClusterAuthz + includeTopicAuthz, err := pd.getBool() + if err != nil { + return err + } + r.IncludeTopicAuthorizedOperations = includeTopicAuthz + } + if r.Version > 8 { + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } return nil } @@ -89,15 +202,24 @@ func (r *MetadataRequest) version() int16 { } func (r *MetadataRequest) headerVersion() int16 { + if r.Version >= 9 { + return 2 + } return 1 } func (r *MetadataRequest) isValidVersion() bool { - return r.Version >= 0 && r.Version <= 7 + return r.Version >= 0 && r.Version <= 10 } func (r *MetadataRequest) requiredVersion() KafkaVersion { switch r.Version { + case 10: + return V2_8_0_0 + case 9: + return V2_4_0_0 + case 8: + return V2_3_0_0 case 7: return V2_1_0_0 case 6: @@ -113,6 +235,6 @@ func (r *MetadataRequest) requiredVersion() KafkaVersion { case 0: return V0_8_2_0 default: - return V2_1_0_0 + return V2_8_0_0 } } diff --git a/metadata_request_test.go b/metadata_request_test.go index 16f6b59425..f999930324 100644 --- a/metadata_request_test.go +++ b/metadata_request_test.go @@ -60,6 +60,54 @@ var ( metadataRequestNoTopicsV5 = append(metadataRequestNoTopicsV1, byte(0)) metadataRequestAutoCreateV5 = append(metadataRequestOneTopicV3, byte(1)) metadataRequestNoAutoCreateV5 = append(metadataRequestOneTopicV3, byte(0)) + + // The v6 metadata request and response are the same as v5. I know, right. + metadataRequestNoTopicsV6 = metadataRequestNoTopicsV5 + metadataRequestAutoCreateV6 = metadataRequestAutoCreateV5 + metadataRequestNoAutoCreateV6 = metadataRequestNoAutoCreateV5 + + // The v7 metadata request is the same as v6. An additional field for + // leader epoch has been added to the partition metadata in the v7 response. + metadataRequestNoTopicsV7 = metadataRequestNoTopicsV6 + metadataRequestAutoCreateV7 = metadataRequestAutoCreateV6 + metadataRequestNoAutoCreateV7 = metadataRequestNoAutoCreateV6 + + // The v8 metadata request has additional fields for including cluster authorized operations + // and including topic authorized operations. An additional field for cluster authorized operations + // has been added to the v8 metadata response, and an additional field for topic authorized operations + // has been added to the topic metadata in the v8 metadata response. + metadataRequestNoTopicsV8 = append(metadataRequestNoTopicsV7, []byte{0, 0}...) + metadataRequestAutoCreateV8 = append(metadataRequestAutoCreateV7, []byte{0, 0}...) + metadataRequestNoAutoCreateV8 = append(metadataRequestNoAutoCreateV7, []byte{0, 0}...) + // Appending to an empty slice means we are creating a new backing array, rather than updating the backing array + // for the slice metadataRequestAutoCreateV7 + metadataRequestAutoCreateClusterAuthTopicAuthV8 = append(append([]byte{}, metadataRequestAutoCreateV7...), []byte{1, 1}...) + + // In v9 tag buffers have been added to the end of arrays, and various types have been replaced with compact types. + metadataRequestNoTopicsV9 = []byte{ + 0x00, 0x00, 0x00, 0x00, 0x00, + } + + metadataRequestOneTopicV9 = []byte{ + 2, 7, 't', 'o', 'p', 'i', 'c', '1', 0, 0, 0, 0, 0, + } + + metadataRequestOneTopicAutoCreateTopicV9 = []byte{ + 2, 7, 't', 'o', 'p', 'i', 'c', '1', 0, 1, 0, 1, 0, + } + + // v10 added topic UUIDs to the metadata request and responses, and made the topic name nullable in the request. + metadataRequestNoTopicsV10 = metadataRequestNoTopicsV9 + + metadataRequestTwoTopicsV10 = []byte{ + 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 't', 'o', 'p', 'i', 'c', '1', + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 't', 'o', 'p', 'i', 'c', '2', 0, 0, 0, 0, 0, + } + + metadataRequestAutoCreateClusterAuthTopicAuthV10 = []byte{ + 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 't', 'o', 'p', 'i', 'c', '1', + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 't', 'o', 'p', 'i', 'c', '2', 0, 1, 1, 1, 0, + } ) func TestMetadataRequestV0(t *testing.T) { @@ -136,3 +184,77 @@ func TestMetadataRequestV5(t *testing.T) { request.AllowAutoTopicCreation = false testRequest(t, "one topic", request, metadataRequestNoAutoCreateV5) } + +func TestMetadataRequestV6(t *testing.T) { + request := new(MetadataRequest) + request.Version = 6 + testRequest(t, "no topics", request, metadataRequestNoTopicsV6) + + request.Topics = []string{"topic1"} + + request.AllowAutoTopicCreation = true + testRequest(t, "one topic", request, metadataRequestAutoCreateV6) + + request.AllowAutoTopicCreation = false + testRequest(t, "one topic", request, metadataRequestNoAutoCreateV6) +} + +func TestMetadataRequestV7(t *testing.T) { + request := new(MetadataRequest) + request.Version = 7 + testRequest(t, "no topics", request, metadataRequestNoTopicsV7) + + request.Topics = []string{"topic1"} + + request.AllowAutoTopicCreation = true + testRequest(t, "one topic", request, metadataRequestAutoCreateV7) + + request.AllowAutoTopicCreation = false + testRequest(t, "one topic", request, metadataRequestNoAutoCreateV7) +} + +func TestMetadataRequestV8(t *testing.T) { + request := new(MetadataRequest) + request.Version = 8 + testRequest(t, "no topics", request, metadataRequestNoTopicsV8) + + request.Topics = []string{"topic1"} + + request.AllowAutoTopicCreation = true + testRequest(t, "one topic, auto create", request, metadataRequestAutoCreateV8) + + request.AllowAutoTopicCreation = false + testRequest(t, "one topic, no auto create", request, metadataRequestNoAutoCreateV8) + + request.AllowAutoTopicCreation = true + request.IncludeClusterAuthorizedOperations = true + request.IncludeTopicAuthorizedOperations = true + testRequest(t, "one topic, auto create, cluster auth, topic auth", request, metadataRequestAutoCreateClusterAuthTopicAuthV8) +} + +func TestMetadataRequestV9(t *testing.T) { + request := new(MetadataRequest) + request.Version = 9 + testRequest(t, "no topics", request, metadataRequestNoTopicsV9) + + request.Topics = []string{"topic1"} + testRequest(t, "one topic", request, metadataRequestOneTopicV9) + + request.AllowAutoTopicCreation = true + request.IncludeTopicAuthorizedOperations = true + testRequest(t, "one topic, auto create, no cluster auth, topic auth", request, metadataRequestOneTopicAutoCreateTopicV9) +} + +func TestMetadataRequestV10(t *testing.T) { + request := new(MetadataRequest) + request.Version = 10 + testRequest(t, "no topics", request, metadataRequestNoTopicsV10) + + request.Topics = []string{"topic1", "topic2"} + testRequest(t, "one topic", request, metadataRequestTwoTopicsV10) + + request.AllowAutoTopicCreation = true + request.IncludeClusterAuthorizedOperations = true + request.IncludeTopicAuthorizedOperations = true + testRequest(t, "one topic, auto create, cluster auth, topic auth", request, metadataRequestAutoCreateClusterAuthTopicAuthV10) +} diff --git a/metadata_response.go b/metadata_response.go index 902364963e..3f0059fe7f 100644 --- a/metadata_response.go +++ b/metadata_response.go @@ -44,16 +44,38 @@ func (p *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) } } - if p.Replicas, err = pd.getInt32Array(); err != nil { + if p.Version < 9 { + p.Replicas, err = pd.getInt32Array() + } else { + p.Replicas, err = pd.getCompactInt32Array() + } + if err != nil { return err } - if p.Isr, err = pd.getInt32Array(); err != nil { + if p.Version < 9 { + p.Isr, err = pd.getInt32Array() + } else { + p.Isr, err = pd.getCompactInt32Array() + } + if err != nil { return err } if p.Version >= 5 { - if p.OfflineReplicas, err = pd.getInt32Array(); err != nil { + if p.Version < 9 { + p.OfflineReplicas, err = pd.getInt32Array() + } else { + p.OfflineReplicas, err = pd.getCompactInt32Array() + } + if err != nil { + return err + } + } + + if p.Version >= 9 { + _, err = pd.getEmptyTaggedFieldArray() + if err != nil { return err } } @@ -73,20 +95,39 @@ func (p *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) pe.putInt32(p.LeaderEpoch) } - if err := pe.putInt32Array(p.Replicas); err != nil { + if p.Version < 9 { + err = pe.putInt32Array(p.Replicas) + } else { + err = pe.putCompactInt32Array(p.Replicas) + } + if err != nil { return err } - if err := pe.putInt32Array(p.Isr); err != nil { + if p.Version < 9 { + err = pe.putInt32Array(p.Isr) + } else { + err = pe.putCompactInt32Array(p.Isr) + } + if err != nil { return err } if p.Version >= 5 { - if err := pe.putInt32Array(p.OfflineReplicas); err != nil { + if p.Version < 9 { + err = pe.putInt32Array(p.OfflineReplicas) + } else { + err = pe.putCompactInt32Array(p.OfflineReplicas) + } + if err != nil { return err } } + if p.Version >= 9 { + pe.putEmptyTaggedFieldArray() + } + return nil } @@ -98,10 +139,12 @@ type TopicMetadata struct { Err KError // Name contains the topic name. Name string + Uuid Uuid // IsInternal contains a True if the topic is internal. IsInternal bool // Partitions contains each partition in the topic. - Partitions []*PartitionMetadata + Partitions []*PartitionMetadata + TopicAuthorizedOperations int32 // Only valid for Version >= 8 } func (t *TopicMetadata) decode(pd packetDecoder, version int16) (err error) { @@ -112,21 +155,44 @@ func (t *TopicMetadata) decode(pd packetDecoder, version int16) (err error) { } t.Err = KError(tmp) - if t.Name, err = pd.getString(); err != nil { + if t.Version < 9 { + t.Name, err = pd.getString() + } else { + t.Name, err = pd.getCompactString() + } + if err != nil { return err } + if t.Version >= 10 { + uuid, err := pd.getRawBytes(16) + if err != nil { + return err + } + t.Uuid = [16]byte{} + for i := 0; i < 16; i++ { + t.Uuid[i] = uuid[i] + } + } + if t.Version >= 1 { - if t.IsInternal, err = pd.getBool(); err != nil { + t.IsInternal, err = pd.getBool() + if err != nil { return err } } - if numPartitions, err := pd.getArrayLength(); err != nil { + var n int + if t.Version < 9 { + n, err = pd.getArrayLength() + } else { + n, err = pd.getCompactArrayLength() + } + if err != nil { return err } else { - t.Partitions = make([]*PartitionMetadata, numPartitions) - for i := 0; i < numPartitions; i++ { + t.Partitions = make([]*PartitionMetadata, n) + for i := 0; i < n; i++ { block := &PartitionMetadata{} if err := block.decode(pd, t.Version); err != nil { return err @@ -135,6 +201,20 @@ func (t *TopicMetadata) decode(pd packetDecoder, version int16) (err error) { } } + if t.Version >= 8 { + t.TopicAuthorizedOperations, err = pd.getInt32() + if err != nil { + return err + } + } + + if t.Version >= 9 { + _, err = pd.getEmptyTaggedFieldArray() + if err != nil { + return err + } + } + return nil } @@ -142,16 +222,33 @@ func (t *TopicMetadata) encode(pe packetEncoder, version int16) (err error) { t.Version = version pe.putInt16(int16(t.Err)) - if err := pe.putString(t.Name); err != nil { + if t.Version < 9 { + err = pe.putString(t.Name) + } else { + err = pe.putCompactString(t.Name) + } + if err != nil { return err } + if t.Version >= 10 { + err = pe.putRawBytes(t.Uuid[:]) + if err != nil { + return err + } + } + if t.Version >= 1 { pe.putBool(t.IsInternal) } - if err := pe.putArrayLength(len(t.Partitions)); err != nil { - return err + if t.Version < 9 { + err = pe.putArrayLength(len(t.Partitions)) + if err != nil { + return err + } + } else { + pe.putCompactArrayLength(len(t.Partitions)) } for _, block := range t.Partitions { if err := block.encode(pe, t.Version); err != nil { @@ -159,6 +256,14 @@ func (t *TopicMetadata) encode(pe packetEncoder, version int16) (err error) { } } + if t.Version >= 8 { + pe.putInt32(t.TopicAuthorizedOperations) + } + + if t.Version >= 9 { + pe.putEmptyTaggedFieldArray() + } + return nil } @@ -174,7 +279,8 @@ type MetadataResponse struct { // ControllerID contains the ID of the controller broker. ControllerID int32 // Topics contains each topic in the response. - Topics []*TopicMetadata + Topics []*TopicMetadata + ClusterAuthorizedOperations int32 // Only valid for Version >= 8 } func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) { @@ -185,12 +291,18 @@ func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) { } } - n, err := pd.getArrayLength() + var brokerArrayLen int + if r.Version < 9 { + brokerArrayLen, err = pd.getArrayLength() + } else { + brokerArrayLen, err = pd.getCompactArrayLength() + } if err != nil { return err } - r.Brokers = make([]*Broker, n) - for i := 0; i < n; i++ { + + r.Brokers = make([]*Broker, brokerArrayLen) + for i := 0; i < brokerArrayLen; i++ { r.Brokers[i] = new(Broker) err = r.Brokers[i].decode(pd, version) if err != nil { @@ -199,7 +311,12 @@ func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) { } if r.Version >= 2 { - if r.ClusterID, err = pd.getNullableString(); err != nil { + if r.Version < 9 { + r.ClusterID, err = pd.getNullableString() + } else { + r.ClusterID, err = pd.getCompactNullableString() + } + if err != nil { return err } } @@ -210,16 +327,36 @@ func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) { } } - if numTopics, err := pd.getArrayLength(); err != nil { - return err + var topicArrayLen int + if version < 9 { + topicArrayLen, err = pd.getArrayLength() } else { - r.Topics = make([]*TopicMetadata, numTopics) - for i := 0; i < numTopics; i++ { - block := &TopicMetadata{} - if err := block.decode(pd, r.Version); err != nil { - return err - } - r.Topics[i] = block + topicArrayLen, err = pd.getCompactArrayLength() + } + if err != nil { + return err + } + + r.Topics = make([]*TopicMetadata, topicArrayLen) + for i := 0; i < topicArrayLen; i++ { + r.Topics[i] = new(TopicMetadata) + err = r.Topics[i].decode(pd, version) + if err != nil { + return err + } + } + + if r.Version >= 8 { + r.ClusterAuthorizedOperations, err = pd.getInt32() + if err != nil { + return err + } + } + + if r.Version >= 9 { + _, err := pd.getEmptyTaggedFieldArray() + if err != nil { + return err } } @@ -231,9 +368,15 @@ func (r *MetadataResponse) encode(pe packetEncoder) (err error) { pe.putInt32(r.ThrottleTimeMs) } - if err := pe.putArrayLength(len(r.Brokers)); err != nil { - return err + if r.Version < 9 { + err = pe.putArrayLength(len(r.Brokers)) + if err != nil { + return err + } + } else { + pe.putCompactArrayLength(len(r.Brokers)) } + for _, broker := range r.Brokers { err = broker.encode(pe, r.Version) if err != nil { @@ -242,8 +385,16 @@ func (r *MetadataResponse) encode(pe packetEncoder) (err error) { } if r.Version >= 2 { - if err := pe.putNullableString(r.ClusterID); err != nil { - return err + if r.Version < 9 { + err = pe.putNullableString(r.ClusterID) + if err != nil { + return err + } + } else { + err = pe.putNullableCompactString(r.ClusterID) + if err != nil { + return err + } } } @@ -251,7 +402,12 @@ func (r *MetadataResponse) encode(pe packetEncoder) (err error) { pe.putInt32(r.ControllerID) } - if err := pe.putArrayLength(len(r.Topics)); err != nil { + if r.Version < 9 { + err = pe.putArrayLength(len(r.Topics)) + } else { + pe.putCompactArrayLength(len(r.Topics)) + } + if err != nil { return err } for _, block := range r.Topics { @@ -260,6 +416,14 @@ func (r *MetadataResponse) encode(pe packetEncoder) (err error) { } } + if r.Version >= 8 { + pe.putInt32(r.ClusterAuthorizedOperations) + } + + if r.Version >= 9 { + pe.putEmptyTaggedFieldArray() + } + return nil } @@ -272,7 +436,11 @@ func (r *MetadataResponse) version() int16 { } func (r *MetadataResponse) headerVersion() int16 { - return 0 + if r.Version < 9 { + return 0 + } else { + return 1 + } } func (r *MetadataResponse) isValidVersion() bool { @@ -281,6 +449,12 @@ func (r *MetadataResponse) isValidVersion() bool { func (r *MetadataResponse) requiredVersion() KafkaVersion { switch r.Version { + case 10: + return V2_8_0_0 + case 9: + return V2_4_0_0 + case 8: + return V2_3_0_0 case 7: return V2_1_0_0 case 6: @@ -296,7 +470,7 @@ func (r *MetadataResponse) requiredVersion() KafkaVersion { case 0: return V0_8_2_0 default: - return V2_1_0_0 + return V2_8_0_0 } } diff --git a/metadata_response_test.go b/metadata_response_test.go index 9231fd8506..63c1d4801b 100644 --- a/metadata_response_test.go +++ b/metadata_response_test.go @@ -109,6 +109,70 @@ var ( 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, } + + OneTopicV6 = []byte{ + 0x00, 0x00, 0x00, 0x07, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 'h', 'o', 's', + 't', 0x00, 0x00, 0x23, 0x84, 0xff, 0xff, 0x00, 0x09, 'c', 'l', 'u', 's', 't', 'e', 'r', + 'I', 'd', 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 't', 'o', + 'n', 'y', 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x02, 0x00, 0x00, 0x00, 0x00, + } + + OneTopicV7 = []byte{ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 'h', 'o', 's', + 't', 0x00, 0x00, 0x23, 0x84, 0xff, 0xff, 0x00, 0x09, 'c', 'l', 'u', 's', 't', 'e', 'r', + 'I', 'd', 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 't', 'o', + 'n', 'y', 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x7b, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, + } + + OneTopicV8 = []byte{ + 0x00, 0x00, 0x00, 0x00, // throttle ms + 0x00, 0x00, 0x00, 0x01, // length brokers + 0x00, 0x00, 0x00, 0x00, // broker[0].nodeid + 0x00, 0x04, // brokers[0].length(nodehost) + 'h', 'o', 's', 't', // broker[0].nodehost + 0x00, 0x00, 0x23, 0x84, // broker[0].port (9092) + 0xff, 0xff, // brokers[0].rack (null) + 0x00, 0x09, 'c', 'l', 'u', 's', 't', 'e', 'r', + 'I', 'd', 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 't', 'o', + 'n', 'y', 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x7b, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 'Y', 0x00, 0x00, 0x00, + 0xea, + } + + OneTopicV9 = []byte{ + 0x00, 0x00, 0x00, 0x00, // throttle ms + 0x02, // length of brokers + 0x00, 0x00, 0x00, 0x00, // broker[0].nodeid + 0x05, // length of brokers[0].nodehost + 'h', 'o', 's', 't', // brokers[0].nodehost + 0x00, 0x00, 0x23, 0x84, // brokers[0].port (9092) + 0x00, // brokers[0].rack (null) + 0x00, // empty tags + 0x0a, 'c', 'l', 'u', 's', 't', 'e', 'r', 'I', 'd', // cluster id + 0x00, 0x00, 0x00, + 0x01, 0x02, 0x00, 0x00, 0x05, 't', 'o', 'n', 'y', 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7b, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x02, 0x01, 0x00, 0x00, 0x00, 0x01, 'Y', 0x00, 0x00, 0x00, 0x00, 0xea, 0x00, + } + + OneTopicV10 = []byte{ + 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x05, 'h', 'o', 's', 't', 0x00, 0x00, 0x23, + 0x84, 0x00, 0x00, 0x0a, 'c', 'l', 'u', 's', 't', 'e', 'r', 'I', 'd', 0x00, 0x00, 0x00, + 0x01, 0x02, 0x00, 0x00, 0x05, 't', 'o', 'n', 'y', 0x84, 0xcd, 0xa7, 'U', 0x7e, 0x84, 'K', + 0xf9, 0xb7, 0xdc, 0xfc, 0x11, 0x82, 0x07, 'r', 'J', 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7b, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x02, 0x01, 0x00, 0x00, 0x00, 0x01, 'Y', 0x00, 0x00, 0x00, 0x00, 0xea, 0x00, + } ) func TestEmptyMetadataResponseV0(t *testing.T) { @@ -297,3 +361,174 @@ func TestMetadataResponseWithOfflineReplicasV5(t *testing.T) { t.Error("Decoding produced", len(response.Topics[0].Partitions[0].OfflineReplicas), "should have been 1!") } } + +func TestMetadataResponseV6(t *testing.T) { + response := MetadataResponse{} + + testVersionDecodable(t, "no brokers, 1 topic with offline replica V5", &response, OneTopicV6, 6) + if response.ThrottleTimeMs != int32(7) { + t.Error("Decoding produced", response.ThrottleTimeMs, "should have been 7!") + } + if len(response.Brokers) != 1 { + t.Error("Decoding produced", response.Brokers, "should have been 1!") + } + if response.Brokers[0].addr != "host:9092" { + t.Error("Decoding produced", response.Brokers[0].addr, "should have been host:9092!") + } + if response.ControllerID != int32(1) { + t.Error("Decoding produced", response.ControllerID, "should have been 1!") + } + if *response.ClusterID != "clusterId" { + t.Error("Decoding produced", response.ClusterID, "should have been clusterId!") + } + if len(response.Topics) != 1 { + t.Error("Decoding produced", len(response.Topics), "should have been 1!") + } + if len(response.Topics[0].Partitions[0].OfflineReplicas) != 0 { + t.Error("Decoding produced", len(response.Topics[0].Partitions[0].OfflineReplicas), "should have been 0!") + } +} + +func TestMetadataResponseV7(t *testing.T) { + response := MetadataResponse{} + + testVersionDecodable(t, "no brokers, 1 topic with offline replica V5", &response, OneTopicV7, 7) + if response.ThrottleTimeMs != int32(0) { + t.Error("Decoding produced", response.ThrottleTimeMs, "should have been 0!") + } + if len(response.Brokers) != 1 { + t.Error("Decoding produced", response.Brokers, "should have been 1!") + } + if response.Brokers[0].addr != "host:9092" { + t.Error("Decoding produced", response.Brokers[0].addr, "should have been host:9092!") + } + if response.ControllerID != int32(1) { + t.Error("Decoding produced", response.ControllerID, "should have been 1!") + } + if *response.ClusterID != "clusterId" { + t.Error("Decoding produced", response.ClusterID, "should have been clusterId!") + } + if len(response.Topics) != 1 { + t.Error("Decoding produced", len(response.Topics), "should have been 1!") + } + if len(response.Topics[0].Partitions[0].OfflineReplicas) != 0 { + t.Error("Decoding produced", len(response.Topics[0].Partitions[0].OfflineReplicas), "should have been 0!") + } + if response.Topics[0].Partitions[0].LeaderEpoch != 123 { + t.Error("Decoding produced", response.Topics[0].Partitions[0].LeaderEpoch, "should have been 123!") + } +} + +func TestMetadataResponseV8(t *testing.T) { + response := MetadataResponse{} + + testVersionDecodable(t, "no brokers, 1 topic with offline replica V5", &response, OneTopicV8, 8) + if response.ThrottleTimeMs != int32(0) { + t.Error("Decoding produced", response.ThrottleTimeMs, "should have been 0!") + } + if len(response.Brokers) != 1 { + t.Error("Decoding produced", response.Brokers, "should have been 1!") + } + if response.Brokers[0].addr != "host:9092" { + t.Error("Decoding produced", response.Brokers[0].addr, "should have been host:9092!") + } + if response.ControllerID != int32(1) { + t.Error("Decoding produced", response.ControllerID, "should have been 1!") + } + if *response.ClusterID != "clusterId" { + t.Error("Decoding produced", response.ClusterID, "should have been clusterId!") + } + if response.ClusterAuthorizedOperations != 234 { + t.Error("Decoding produced", response.ClusterAuthorizedOperations, "should have been 234!") + } + if len(response.Topics) != 1 { + t.Error("Decoding produced", len(response.Topics), "should have been 1!") + } + if response.Topics[0].TopicAuthorizedOperations != 345 { + t.Error("Decoding produced", response.Topics[0].TopicAuthorizedOperations, "should have been 345!") + } + if len(response.Topics[0].Partitions[0].OfflineReplicas) != 0 { + t.Error("Decoding produced", len(response.Topics[0].Partitions[0].OfflineReplicas), "should have been 0!") + } + if response.Topics[0].Partitions[0].LeaderEpoch != 123 { + t.Error("Decoding produced", response.Topics[0].Partitions[0].LeaderEpoch, "should have been 123!") + } +} + +func TestMetadataResponseV9(t *testing.T) { + response := MetadataResponse{} + + testVersionDecodable(t, "no brokers, 1 topic with offline replica V5", &response, OneTopicV9, 9) + if response.ThrottleTimeMs != int32(0) { + t.Error("Decoding produced", response.ThrottleTimeMs, "should have been 0!") + } + if len(response.Brokers) != 1 { + t.Error("Decoding produced", response.Brokers, "should have been 1!") + } + if response.Brokers[0].addr != "host:9092" { + t.Error("Decoding produced", response.Brokers[0].addr, "should have been host:9092!") + } + if response.ControllerID != int32(1) { + t.Error("Decoding produced", response.ControllerID, "should have been 1!") + } + if *response.ClusterID != "clusterId" { + t.Error("Decoding produced", response.ClusterID, "should have been clusterId!") + } + if response.ClusterAuthorizedOperations != 234 { + t.Error("Decoding produced", response.ClusterAuthorizedOperations, "should have been 234!") + } + if len(response.Topics) != 1 { + t.Error("Decoding produced", len(response.Topics), "should have been 1!") + } + if response.Topics[0].TopicAuthorizedOperations != 345 { + t.Error("Decoding produced", response.Topics[0].TopicAuthorizedOperations, "should have been 345!") + } + if len(response.Topics[0].Partitions[0].OfflineReplicas) != 0 { + t.Error("Decoding produced", len(response.Topics[0].Partitions[0].OfflineReplicas), "should have been 0!") + } + if response.Topics[0].Partitions[0].LeaderEpoch != 123 { + t.Error("Decoding produced", response.Topics[0].Partitions[0].LeaderEpoch, "should have been 123!") + } +} + +func TestMetadataResponseV10(t *testing.T) { + response := MetadataResponse{} + + testVersionDecodable(t, "no brokers, 1 topic with offline replica V5", &response, OneTopicV10, 10) + if response.ThrottleTimeMs != int32(0) { + t.Error("Decoding produced", response.ThrottleTimeMs, "should have been 0!") + } + if len(response.Brokers) != 1 { + t.Error("Decoding produced", response.Brokers, "should have been 1!") + } + if response.Brokers[0].addr != "host:9092" { + t.Error("Decoding produced", response.Brokers[0].addr, "should have been host:9092!") + } + if response.ControllerID != int32(1) { + t.Error("Decoding produced", response.ControllerID, "should have been 1!") + } + if *response.ClusterID != "clusterId" { + t.Error("Decoding produced", response.ClusterID, "should have been clusterId!") + } + if response.ClusterAuthorizedOperations != 234 { + t.Error("Decoding produced", response.ClusterAuthorizedOperations, "should have been 234!") + } + if len(response.Topics) != 1 { + t.Error("Decoding produced", len(response.Topics), "should have been 1!") + } + if response.Topics[0].Uuid != [16]byte{ + 0x84, 0xcd, 0xa7, 0x55, 0x7e, 0x84, 0x4b, 0xf9, + 0xb7, 0xdc, 0xfc, 0x11, 0x82, 0x07, 0x72, 0x4a, + } { + t.Error("Decoding produced", response.Topics[0].Uuid, "should have been different!") + } + if response.Topics[0].TopicAuthorizedOperations != 345 { + t.Error("Decoding produced", response.Topics[0].TopicAuthorizedOperations, "should have been 345!") + } + if len(response.Topics[0].Partitions[0].OfflineReplicas) != 0 { + t.Error("Decoding produced", len(response.Topics[0].Partitions[0].OfflineReplicas), "should have been 0!") + } + if response.Topics[0].Partitions[0].LeaderEpoch != 123 { + t.Error("Decoding produced", response.Topics[0].Partitions[0].LeaderEpoch, "should have been 123!") + } +}