Skip to content

Commit

Permalink
Merge pull request #413 from Shopify/consumer-metadata-response-broker
Browse files Browse the repository at this point in the history
Automatically construct a broker from the ConsumerMetadataResponse
  • Loading branch information
wvanbergen committed Apr 10, 2015
2 parents fcf765a + 5d12edf commit 92d625e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 8 deletions.
26 changes: 18 additions & 8 deletions consumer_metadata_response.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package sarama

import (
"net"
"strconv"
)

type ConsumerMetadataResponse struct {
Err KError
CoordinatorID int32
CoordinatorHost string
CoordinatorPort int32
Coordinator *Broker
CoordinatorID int32 // deprecated: use Coordinator.ID()
CoordinatorHost string // deprecated: use Coordinator.Addr()
CoordinatorPort int32 // deprecated: use Coordinator.Addr()
}

func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
Expand All @@ -14,20 +20,24 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
}
r.Err = KError(tmp)

r.CoordinatorID, err = pd.getInt32()
if err != nil {
r.Coordinator = new(Broker)
if err := r.Coordinator.decode(pd); err != nil {
return err
}

r.CoordinatorHost, err = pd.getString()
// this can all go away in 2.0, but we have to fill in deprecated fields to maintain
// backwards compatibility
host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
if err != nil {
return err
}

r.CoordinatorPort, err = pd.getInt32()
port, err := strconv.ParseInt(portstr, 10, 32)
if err != nil {
return err
}
r.CoordinatorID = r.Coordinator.ID()
r.CoordinatorHost = host
r.CoordinatorPort = int32(port)

return nil
}
8 changes: 8 additions & 0 deletions consumer_metadata_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,12 @@ func TestConsumerMetadataResponseSuccess(t *testing.T) {
if response.CoordinatorPort != 0xCCDD {
t.Error("Decoding produced incorrect coordinator port.")
}

if response.Coordinator.ID() != 0xAB {
t.Error("Decoding produced incorrect coordinator ID.")
}

if response.Coordinator.Addr() != "foo:52445" {
t.Error("Decoding produced incorrect coordinator address.")
}
}

0 comments on commit 92d625e

Please sign in to comment.