Skip to content

Commit

Permalink
Merge pull request #1069 from mimaison/metadata_new_versions
Browse files Browse the repository at this point in the history
Added support for Metadata Request/Response up to v5
  • Loading branch information
eapache authored Apr 2, 2018
2 parents 5e8fd95 + bbdeda9 commit c1ec7c5
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 24 deletions.
26 changes: 21 additions & 5 deletions metadata_request.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package sarama

type MetadataRequest struct {
Version int16
Topics []string
Version int16
Topics []string
AllowAutoTopicCreation bool
}

func (r *MetadataRequest) encode(pe packetEncoder) error {
if r.Version < 0 || r.Version > 1 {
if r.Version < 0 || r.Version > 5 {
return PacketEncodingError{"invalid or unsupported MetadataRequest version field"}
}
if r.Version == 0 || r.Topics != nil || len(r.Topics) > 0 {
Expand All @@ -24,6 +25,9 @@ func (r *MetadataRequest) encode(pe packetEncoder) error {
} else {
pe.putInt32(-1)
}
if r.Version > 3 {
pe.putBool(r.AllowAutoTopicCreation)
}
return nil
}

Expand All @@ -49,9 +53,15 @@ func (r *MetadataRequest) decode(pd packetDecoder, version int16) error {
}
r.Topics[i] = topic
}
return nil
}

if r.Version > 3 {
autoCreation, err := pd.getBool()
if err != nil {
return err
}
r.AllowAutoTopicCreation = autoCreation
}
return nil
}

func (r *MetadataRequest) key() int16 {
Expand All @@ -66,6 +76,12 @@ func (r *MetadataRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V0_10_0_0
case 2:
return V0_10_1_0
case 3, 4:
return V0_11_0_0
case 5:
return V1_0_0_0
default:
return MinVersion
}
Expand Down
32 changes: 32 additions & 0 deletions metadata_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ var (

metadataRequestNoTopicsV1 = []byte{
0xff, 0xff, 0xff, 0xff}

metadataRequestAutoCreateV4 = append(metadataRequestOneTopicV0, byte(1))
metadataRequestNoAutoCreateV4 = append(metadataRequestOneTopicV0, byte(0))
)

func TestMetadataRequestV0(t *testing.T) {
Expand All @@ -42,3 +45,32 @@ func TestMetadataRequestV1(t *testing.T) {
request.Topics = []string{"foo", "bar", "baz"}
testRequest(t, "three topics", request, metadataRequestThreeTopicsV0)
}

func TestMetadataRequestV2(t *testing.T) {
request := new(MetadataRequest)
request.Version = 2
testRequest(t, "no topics", request, metadataRequestNoTopicsV1)

request.Topics = []string{"topic1"}
testRequest(t, "one topic", request, metadataRequestOneTopicV0)
}

func TestMetadataRequestV3(t *testing.T) {
request := new(MetadataRequest)
request.Version = 3
testRequest(t, "no topics", request, metadataRequestNoTopicsV1)

request.Topics = []string{"topic1"}
testRequest(t, "one topic", request, metadataRequestOneTopicV0)
}

func TestMetadataRequestV4(t *testing.T) {
request := new(MetadataRequest)
request.Version = 4
request.Topics = []string{"topic1"}
request.AllowAutoTopicCreation = true
testRequest(t, "one topic", request, metadataRequestAutoCreateV4)

request.AllowAutoTopicCreation = false
testRequest(t, "one topic", request, metadataRequestNoAutoCreateV4)
}
74 changes: 59 additions & 15 deletions metadata_response.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package sarama

type PartitionMetadata struct {
Err KError
ID int32
Leader int32
Replicas []int32
Isr []int32
Err KError
ID int32
Leader int32
Replicas []int32
Isr []int32
OfflineReplicas []int32
}

func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) {
func (pm *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) {
tmp, err := pd.getInt16()
if err != nil {
return err
Expand All @@ -35,10 +36,17 @@ func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) {
return err
}

if version >= 5 {
pm.OfflineReplicas, err = pd.getInt32Array()
if err != nil {
return err
}
}

return nil
}

func (pm *PartitionMetadata) encode(pe packetEncoder) (err error) {
func (pm *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) {
pe.putInt16(int16(pm.Err))
pe.putInt32(pm.ID)
pe.putInt32(pm.Leader)
Expand All @@ -53,6 +61,13 @@ func (pm *PartitionMetadata) encode(pe packetEncoder) (err error) {
return err
}

if version >= 5 {
err = pe.putInt32Array(pm.OfflineReplicas)
if err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -89,7 +104,7 @@ func (tm *TopicMetadata) decode(pd packetDecoder, version int16) (err error) {
tm.Partitions = make([]*PartitionMetadata, n)
for i := 0; i < n; i++ {
tm.Partitions[i] = new(PartitionMetadata)
err = tm.Partitions[i].decode(pd)
err = tm.Partitions[i].decode(pd, version)
if err != nil {
return err
}
Expand All @@ -116,7 +131,7 @@ func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
}

for _, pm := range tm.Partitions {
err = pm.encode(pe)
err = pm.encode(pe, version)
if err != nil {
return err
}
Expand All @@ -126,13 +141,24 @@ func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
}

type MetadataResponse struct {
Version int16
Brokers []*Broker
ControllerID int32
Topics []*TopicMetadata
Version int16
ThrottleTimeMs int32
Brokers []*Broker
ClusterID *string
ControllerID int32
Topics []*TopicMetadata
}

func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
r.Version = version

if version >= 3 {
r.ThrottleTimeMs, err = pd.getInt32()
if err != nil {
return err
}
}

n, err := pd.getArrayLength()
if err != nil {
return err
Expand All @@ -147,6 +173,13 @@ func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
}
}

if version >= 2 {
r.ClusterID, err = pd.getNullableString()
if err != nil {
return err
}
}

if version >= 1 {
r.ControllerID, err = pd.getInt32()
if err != nil {
Expand Down Expand Up @@ -208,11 +241,22 @@ func (r *MetadataResponse) key() int16 {
}

func (r *MetadataResponse) version() int16 {
return 0
return r.Version
}

func (r *MetadataResponse) requiredVersion() KafkaVersion {
return MinVersion
switch r.Version {
case 1:
return V0_10_0_0
case 2:
return V0_10_1_0
case 3, 4:
return V0_11_0_0
case 5:
return V1_0_0_0
default:
return MinVersion
}
}

// testing API
Expand Down
78 changes: 74 additions & 4 deletions metadata_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,31 @@ var (
0x00, 0x03, 'b', 'a', 'r',
0x01,
0x00, 0x00, 0x00, 0x00}

noBrokersNoTopicsWithThrottleTimeAndClusterIDV3 = []byte{
0x00, 0x00, 0x00, 0x10,
0x00, 0x00, 0x00, 0x00,
0x00, 0x09, 'c', 'l', 'u', 's', 't', 'e', 'r', 'I', 'd',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00}

noBrokersOneTopicWithOfflineReplicasV5 = []byte{
0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x00,
0x00, 0x09, 'c', 'l', 'u', 's', 't', 'e', 'r', 'I', 'd',
0x00, 0x00, 0x00, 0x02,
0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x03, 'f', 'o', 'o',
0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x04,
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x07,
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03,
0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02,
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03,
}
)

func TestEmptyMetadataResponseV0(t *testing.T) {
Expand Down Expand Up @@ -206,15 +231,60 @@ func TestMetadataResponseWithTopicsV1(t *testing.T) {
t.Error("Decoding produced", len(response.Brokers), "brokers where there were none!")
}
if response.ControllerID != 4 {
t.Error("Decoding produced", len(response.Brokers), "should have been 4!")
t.Error("Decoding produced", response.ControllerID, "should have been 4!")
}
if len(response.Topics) != 2 {
t.Error("Decoding produced", len(response.Brokers), "topics where there were 2!")
t.Error("Decoding produced", len(response.Topics), "topics where there were 2!")
}
if response.Topics[0].IsInternal {
t.Error("Decoding produced", response.ControllerID, "topic0 should have been false!")
t.Error("Decoding produced", response.Topics[0], "topic0 should have been false!")
}
if !response.Topics[1].IsInternal {
t.Error("Decoding produced", response.ControllerID, "topic1 should have been true!")
t.Error("Decoding produced", response.Topics[1], "topic1 should have been true!")
}
}

func TestMetadataResponseWithThrottleTime(t *testing.T) {
response := MetadataResponse{}

testVersionDecodable(t, "no topics, no brokers, throttle time and cluster Id V3", &response, noBrokersNoTopicsWithThrottleTimeAndClusterIDV3, 3)
if response.ThrottleTimeMs != int32(16) {
t.Error("Decoding produced", response.ThrottleTimeMs, "should have been 16!")
}
if len(response.Brokers) != 0 {
t.Error("Decoding produced", response.Brokers, "should have been 0!")
}
if response.ControllerID != int32(1) {
t.Error("Decoding produced", response.ControllerID, "should have been 1!")
}
if *response.ClusterID != "clusterId" {
t.Error("Decoding produced", response.ClusterID, "should have been clusterId!")
}
if len(response.Topics) != 0 {
t.Error("Decoding produced", len(response.Topics), "should have been 0!")
}
}

func TestMetadataResponseWithOfflineReplicasV5(t *testing.T) {
response := MetadataResponse{}

testVersionDecodable(t, "no brokers, 1 topic with offline replica V5", &response, noBrokersOneTopicWithOfflineReplicasV5, 5)
if response.ThrottleTimeMs != int32(5) {
t.Error("Decoding produced", response.ThrottleTimeMs, "should have been 5!")
}
if len(response.Brokers) != 0 {
t.Error("Decoding produced", response.Brokers, "should have been 0!")
}
if response.ControllerID != int32(2) {
t.Error("Decoding produced", response.ControllerID, "should have been 21!")
}
if *response.ClusterID != "clusterId" {
t.Error("Decoding produced", response.ClusterID, "should have been clusterId!")
}
if len(response.Topics) != 1 {
t.Error("Decoding produced", len(response.Topics), "should have been 1!")
}
if len(response.Topics[0].Partitions[0].OfflineReplicas) != 1 {
t.Error("Decoding produced", len(response.Topics[0].Partitions[0].OfflineReplicas), "should have been 1!")
}
}

0 comments on commit c1ec7c5

Please sign in to comment.