Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for Metadata Request/Response v1 #1047

Merged
merged 2 commits into from
Feb 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
type Broker struct {
id int32
addr string
rack *string

conf *Config
correlationID int32
Expand Down Expand Up @@ -592,7 +593,7 @@ func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
}
}

func (b *Broker) decode(pd packetDecoder) (err error) {
func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
b.id, err = pd.getInt32()
if err != nil {
return err
Expand All @@ -608,6 +609,13 @@ func (b *Broker) decode(pd packetDecoder) (err error) {
return err
}

if version >= 1 {
b.rack, err = pd.getNullableString()
if err != nil {
return err
}
}

b.addr = net.JoinHostPort(host, fmt.Sprint(port))
if _, _, err := net.SplitHostPort(b.addr); err != nil {
return err
Expand All @@ -616,7 +624,7 @@ func (b *Broker) decode(pd packetDecoder) (err error) {
return nil
}

func (b *Broker) encode(pe packetEncoder) (err error) {
func (b *Broker) encode(pe packetEncoder, version int16) (err error) {

host, portstr, err := net.SplitHostPort(b.addr)
if err != nil {
Expand All @@ -636,6 +644,13 @@ func (b *Broker) encode(pe packetEncoder) (err error) {

pe.putInt32(int32(port))

if version >= 1 {
err = pe.putNullableString(b.rack)
if err != nil {
return err
}
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions find_coordinator_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err e
}

coordinator := new(Broker)
if err := coordinator.decode(pd); err != nil {
if err := coordinator.decode(pd, 0); err != nil {
return err
}
if coordinator.addr == ":0" {
Expand All @@ -60,7 +60,7 @@ func (f *FindCoordinatorResponse) encode(pe packetEncoder) error {
}
}

if err := f.Coordinator.encode(pe); err != nil {
if err := f.Coordinator.encode(pe, 0); err != nil {
return err
}

Expand Down
58 changes: 39 additions & 19 deletions metadata_request.go
Original file line number Diff line number Diff line change
@@ -1,52 +1,72 @@
package sarama

type MetadataRequest struct {
Topics []string
Version int16
Topics []string
}

func (r *MetadataRequest) encode(pe packetEncoder) error {
err := pe.putArrayLength(len(r.Topics))
if err != nil {
return err
if r.Version < 0 || r.Version > 1 {
return PacketEncodingError{"invalid or unsupported MetadataRequest version field"}
}

for i := range r.Topics {
err = pe.putString(r.Topics[i])
if r.Version == 0 || r.Topics != nil || len(r.Topics) > 0 {
err := pe.putArrayLength(len(r.Topics))
if err != nil {
return err
}

for i := range r.Topics {
err = pe.putString(r.Topics[i])
if err != nil {
return err
}
}
} else {
pe.putInt32(-1)
}
return nil
}

func (r *MetadataRequest) decode(pd packetDecoder, version int16) error {
topicCount, err := pd.getArrayLength()
r.Version = version
size, err := pd.getInt32()
if err != nil {
return err
}
if topicCount == 0 {
if size < 0 {
return nil
}
} else {
topicCount := size
if topicCount == 0 {
return nil
}

r.Topics = make([]string, topicCount)
for i := range r.Topics {
topic, err := pd.getString()
if err != nil {
return err
r.Topics = make([]string, topicCount)
for i := range r.Topics {
topic, err := pd.getString()
if err != nil {
return err
}
r.Topics[i] = topic
}
r.Topics[i] = topic
return nil
}
return nil

}

func (r *MetadataRequest) key() int16 {
return 3
}

func (r *MetadataRequest) version() int16 {
return 0
return r.Version
}

func (r *MetadataRequest) requiredVersion() KafkaVersion {
return MinVersion
switch r.Version {
case 1:
return V0_10_0_0
default:
return MinVersion
}
}
29 changes: 22 additions & 7 deletions metadata_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,42 @@ package sarama
import "testing"

var (
metadataRequestNoTopics = []byte{
metadataRequestNoTopicsV0 = []byte{
0x00, 0x00, 0x00, 0x00}

metadataRequestOneTopic = []byte{
metadataRequestOneTopicV0 = []byte{
0x00, 0x00, 0x00, 0x01,
0x00, 0x06, 't', 'o', 'p', 'i', 'c', '1'}

metadataRequestThreeTopics = []byte{
metadataRequestThreeTopicsV0 = []byte{
0x00, 0x00, 0x00, 0x03,
0x00, 0x03, 'f', 'o', 'o',
0x00, 0x03, 'b', 'a', 'r',
0x00, 0x03, 'b', 'a', 'z'}

metadataRequestNoTopicsV1 = []byte{
0xff, 0xff, 0xff, 0xff}
)

func TestMetadataRequest(t *testing.T) {
func TestMetadataRequestV0(t *testing.T) {
request := new(MetadataRequest)
testRequest(t, "no topics", request, metadataRequestNoTopicsV0)

request.Topics = []string{"topic1"}
testRequest(t, "one topic", request, metadataRequestOneTopicV0)

request.Topics = []string{"foo", "bar", "baz"}
testRequest(t, "three topics", request, metadataRequestThreeTopicsV0)
}

func TestMetadataRequestV1(t *testing.T) {
request := new(MetadataRequest)
testRequest(t, "no topics", request, metadataRequestNoTopics)
request.Version = 1
testRequest(t, "no topics", request, metadataRequestNoTopicsV1)

request.Topics = []string{"topic1"}
testRequest(t, "one topic", request, metadataRequestOneTopic)
testRequest(t, "one topic", request, metadataRequestOneTopicV0)

request.Topics = []string{"foo", "bar", "baz"}
testRequest(t, "three topics", request, metadataRequestThreeTopics)
testRequest(t, "three topics", request, metadataRequestThreeTopicsV0)
}
43 changes: 35 additions & 8 deletions metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ func (pm *PartitionMetadata) encode(pe packetEncoder) (err error) {
type TopicMetadata struct {
Err KError
Name string
IsInternal bool // Only valid for Version >= 1
Partitions []*PartitionMetadata
}

func (tm *TopicMetadata) decode(pd packetDecoder) (err error) {
func (tm *TopicMetadata) decode(pd packetDecoder, version int16) (err error) {
tmp, err := pd.getInt16()
if err != nil {
return err
Expand All @@ -74,6 +75,13 @@ func (tm *TopicMetadata) decode(pd packetDecoder) (err error) {
return err
}

if version >= 1 {
tm.IsInternal, err = pd.getBool()
if err != nil {
return err
}
}

n, err := pd.getArrayLength()
if err != nil {
return err
Expand All @@ -90,14 +98,18 @@ func (tm *TopicMetadata) decode(pd packetDecoder) (err error) {
return nil
}

func (tm *TopicMetadata) encode(pe packetEncoder) (err error) {
func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
pe.putInt16(int16(tm.Err))

err = pe.putString(tm.Name)
if err != nil {
return err
}

if version >= 1 {
pe.putBool(tm.IsInternal)
}

err = pe.putArrayLength(len(tm.Partitions))
if err != nil {
return err
Expand All @@ -114,8 +126,10 @@ func (tm *TopicMetadata) encode(pe packetEncoder) (err error) {
}

type MetadataResponse struct {
Brokers []*Broker
Topics []*TopicMetadata
Version int16
Brokers []*Broker
ControllerID int32
Topics []*TopicMetadata
}

func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
Expand All @@ -127,12 +141,21 @@ func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
r.Brokers = make([]*Broker, n)
for i := 0; i < n; i++ {
r.Brokers[i] = new(Broker)
err = r.Brokers[i].decode(pd)
err = r.Brokers[i].decode(pd, version)
if err != nil {
return err
}
}

if version >= 1 {
r.ControllerID, err = pd.getInt32()
if err != nil {
return err
}
} else {
r.ControllerID = -1
}

n, err = pd.getArrayLength()
if err != nil {
return err
Expand All @@ -141,7 +164,7 @@ func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
r.Topics = make([]*TopicMetadata, n)
for i := 0; i < n; i++ {
r.Topics[i] = new(TopicMetadata)
err = r.Topics[i].decode(pd)
err = r.Topics[i].decode(pd, version)
if err != nil {
return err
}
Expand All @@ -156,18 +179,22 @@ func (r *MetadataResponse) encode(pe packetEncoder) error {
return err
}
for _, broker := range r.Brokers {
err = broker.encode(pe)
err = broker.encode(pe, r.Version)
if err != nil {
return err
}
}

if r.Version >= 1 {
pe.putInt32(r.ControllerID)
}

err = pe.putArrayLength(len(r.Topics))
if err != nil {
return err
}
for _, tm := range r.Topics {
err = tm.encode(pe)
err = tm.encode(pe, r.Version)
if err != nil {
return err
}
Expand Down
Loading