From 2e5395ba98d73ce8d7f19206e9c59e25e2f04ec7 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 13 Feb 2018 15:18:46 +0000 Subject: [PATCH] Added version to broker.encode() --- broker.go | 9 ++++++++- find_coordinator_response.go | 4 ++-- metadata_response.go | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/broker.go b/broker.go index 2413045b5..a8c5a5b62 100644 --- a/broker.go +++ b/broker.go @@ -624,7 +624,7 @@ func (b *Broker) decode(pd packetDecoder, version int16) (err error) { return nil } -func (b *Broker) encode(pe packetEncoder) (err error) { +func (b *Broker) encode(pe packetEncoder, version int16) (err error) { host, portstr, err := net.SplitHostPort(b.addr) if err != nil { @@ -644,6 +644,13 @@ func (b *Broker) encode(pe packetEncoder) (err error) { pe.putInt32(int32(port)) + if version >= 1 { + err = pe.putNullableString(b.rack) + if err != nil { + return err + } + } + return nil } diff --git a/find_coordinator_response.go b/find_coordinator_response.go index f2d178f7c..521dff097 100644 --- a/find_coordinator_response.go +++ b/find_coordinator_response.go @@ -36,7 +36,7 @@ func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err e } coordinator := new(Broker) - if err := coordinator.decode(pd); err != nil { + if err := coordinator.decode(pd, 0); err != nil { return err } if coordinator.addr == ":0" { @@ -60,7 +60,7 @@ func (f *FindCoordinatorResponse) encode(pe packetEncoder) error { } } - if err := f.Coordinator.encode(pe); err != nil { + if err := f.Coordinator.encode(pe, 0); err != nil { return err } diff --git a/metadata_response.go b/metadata_response.go index b54d8d40a..8190e8521 100644 --- a/metadata_response.go +++ b/metadata_response.go @@ -179,7 +179,7 @@ func (r *MetadataResponse) encode(pe packetEncoder) error { return err } for _, broker := range r.Brokers { - err = broker.encode(pe) + err = broker.encode(pe, r.Version) if err != nil { return err }