Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose broker metrics with go-metrics #701

Merged
merged 5 commits into from
Aug 30, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/rcrowley/go-metrics"
)

// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
Expand All @@ -26,6 +28,19 @@ type Broker struct {

responses chan responsePromise
done chan bool

incomingByteRate metrics.Meter
requestRate metrics.Meter
requestSize metrics.Histogram
outgoingByteRate metrics.Meter
responseRate metrics.Meter
responseSize metrics.Histogram
brokerIncomingByteRate metrics.Meter
brokerRequestRate metrics.Meter
brokerRequestSize metrics.Histogram
brokerOutgoingByteRate metrics.Meter
brokerResponseRate metrics.Meter
brokerResponseSize metrics.Histogram
}

type responsePromise struct {
Expand Down Expand Up @@ -84,6 +99,24 @@ func (b *Broker) Open(conf *Config) error {

b.conf = conf

// Create or reuse the global metrics shared between brokers
b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
// Do not gather metrics for seeded broker (only used during bootstrap) because they share
// the same id (-1) and are already exposed through the global metrics above
if b.id >= 0 {
b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
}

if conf.Net.SASL.Enable {
b.connErr = b.sendAndReceiveSASLPlainAuth()
if b.connErr != nil {
Expand Down Expand Up @@ -338,6 +371,8 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
return nil, err
}

b.updateOutgoingCommunicationMetrics(len(buf))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably want to move this down below the SetWriteDeadline else you could miscount in the (admittedly exceptional) case where that fails


err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
if err != nil {
return nil, err
Expand Down Expand Up @@ -471,6 +506,8 @@ func (b *Broker) responseReceiver() {
continue
}

b.updateIncomingCommunicationMetrics(len(header) + len(buf))

response.packets <- buf
}
close(b.done)
Expand Down Expand Up @@ -500,6 +537,8 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
binary.BigEndian.PutUint32(authBytes, uint32(length))
copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))

b.updateOutgoingCommunicationMetrics(len(authBytes))

err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
if err != nil {
Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
Expand All @@ -521,6 +560,40 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
return err
}

b.updateIncomingCommunicationMetrics(n)

Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
return nil
}

func (b *Broker) updateIncomingCommunicationMetrics(bytes int) {
b.responseRate.Mark(1)
if b.brokerResponseRate != nil {
b.brokerResponseRate.Mark(1)
}
responseSize := int64(bytes)
b.incomingByteRate.Mark(responseSize)
if b.brokerIncomingByteRate != nil {
b.brokerIncomingByteRate.Mark(responseSize)
}
b.responseSize.Update(responseSize)
if b.brokerResponseSize != nil {
b.brokerResponseSize.Update(responseSize)
}
}

func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
b.requestRate.Mark(1)
if b.brokerRequestRate != nil {
b.brokerRequestRate.Mark(1)
}
requestSize := int64(bytes)
b.outgoingByteRate.Mark(requestSize)
if b.brokerOutgoingByteRate != nil {
b.brokerOutgoingByteRate.Mark(requestSize)
}
b.requestSize.Update(requestSize)
if b.brokerRequestSize != nil {
b.brokerRequestSize.Update(requestSize)
}
}
121 changes: 90 additions & 31 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package sarama
import (
"fmt"
"testing"
"time"

"github.com/rcrowley/go-metrics"
)

func ExampleBroker() {
Expand Down Expand Up @@ -52,36 +55,45 @@ func TestBrokerAccessors(t *testing.T) {
}

func TestSimpleBrokerCommunication(t *testing.T) {
mb := NewMockBroker(t, 0)
defer mb.Close()

broker := NewBroker(mb.Addr())
conf := NewConfig()
conf.Version = V0_10_0_0
err := broker.Open(conf)
if err != nil {
t.Fatal(err)
}

for _, tt := range brokerTestTable {
Logger.Printf("Testing broker communication for %s", tt.name)
mb := NewMockBroker(t, 0)
mb.Returns(&mockEncoder{tt.response})
}
for _, tt := range brokerTestTable {
broker := NewBroker(mb.Addr())
// Set the broker id in order to validate local broker metrics
broker.id = 0
conf := NewConfig()
conf.Version = V0_10_0_0
// Use a new registry every time to prevent side effect caused by the global one
conf.MetricRegistry = metrics.NewRegistry()
err := broker.Open(conf)
if err != nil {
t.Fatal(err)
}
tt.runner(t, broker)
err = broker.Close()
if err != nil {
t.Error(err)
}
// Wait up to 500 ms for the remote broker to process requests
// in order to have consistent metrics
if err := mb.WaitForExpectations(500 * time.Millisecond); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why this is necessary, isn't the request guaranteed to be fully processed by the time tt.runner finishes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use a dedicated MockBroker for each runner otherwise the statistics to validate metrics against would contains data from previous runs.

But I found a race condition that often manifests itself for unacknowledged produce request (i.e. RequiredAcks set to NoResponse) but can happen for standard request too when using a MockBroker for each runner.
That is the runner would return early because the produce request has been written to the socket and invoking MockBroker.Close() gets invoked before MockBroker is able to process that request (the request might not even have been read by MockBroker but already acknowledged by the TCP stack) and therefore consume from the expectations channel leading to an error about expectation not being satisfied or that the statistics about the bytes written by the MockBroker being 0.

On a regular produce request, the runner would block on reading the response and by then it is very likely that the MockBroker has consumed his expectations as well as update his statistics but there is a very tight window where the runner would be reading the statistics to validate the metrics before the MockBroker updates them.

My first fix was to add a magic 100 ms sleep for the ProduceRequest (NoResponse) but like every magic sleep that was futile and then I believe I ran into the second case once probably because the integration server being loaded.
The working fix I came with consists of adding that WaitForExpectations method to be sure that we received and processed the expected requests including updating statistics.
I added a timeout of 500ms to be sure that it would not blocked on failing test or very loaded integration server, it generally finishes under a couple of ms.

I guess it would be good to document those scenarios better on both broker_test.go and mockbroker.go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The race condition for unacknowledged produce requests is a very nice catch but I'm not sure I understand how it can possibly manifest for any other requests? The runner will not return in those cases until the mock broker has actually responded, which means it has to have consumed the expectation?

If I am understanding the other issue properly, it is because the mockbroker updates its metrics after sending the response, so the test can get 0 metrics if it gets run in that gap? Would not the simpler fix be to block mb.Close() until all metrics are updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, it should not happen and mb.Close() is the proper fix for having consistent metrics for regular requests and that is actually the way it is done in the last PR commit.

I remembered that corner case while retracing my commits because I got bit by it when reusing original defer mb.Close() that would be executing after checking the metrics (see intermediate commit f9642ad), sorry for the confusion.

t.Error(err)
}
mb.Close()
validateBrokerMetrics(t, broker, mb)
}

err = broker.Close()
if err != nil {
t.Error(err)
}
}

// We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
var brokerTestTable = []struct {
name string
response []byte
runner func(*testing.T, *Broker)
}{
{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
{"MetadataRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := MetadataRequest{}
response, err := broker.GetMetadata(&request)
Expand All @@ -93,7 +105,8 @@ var brokerTestTable = []struct {
}
}},

{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
{"ConsumerMetadataRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ConsumerMetadataRequest{}
response, err := broker.GetConsumerMetadata(&request)
Expand All @@ -105,7 +118,8 @@ var brokerTestTable = []struct {
}
}},

{[]byte{},
{"ProduceRequest (NoResponse)",
[]byte{},
func(t *testing.T, broker *Broker) {
request := ProduceRequest{}
request.RequiredAcks = NoResponse
Expand All @@ -118,7 +132,8 @@ var brokerTestTable = []struct {
}
}},

{[]byte{0x00, 0x00, 0x00, 0x00},
{"ProduceRequest (WaitForLocal)",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ProduceRequest{}
request.RequiredAcks = WaitForLocal
Expand All @@ -131,7 +146,8 @@ var brokerTestTable = []struct {
}
}},

{[]byte{0x00, 0x00, 0x00, 0x00},
{"FetchRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := FetchRequest{}
response, err := broker.Fetch(&request)
Expand All @@ -143,7 +159,8 @@ var brokerTestTable = []struct {
}
}},

{[]byte{0x00, 0x00, 0x00, 0x00},
{"OffsetFetchRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetFetchRequest{}
response, err := broker.FetchOffset(&request)
Expand All @@ -155,7 +172,8 @@ var brokerTestTable = []struct {
}
}},

{[]byte{0x00, 0x00, 0x00, 0x00},
{"OffsetCommitRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetCommitRequest{}
response, err := broker.CommitOffset(&request)
Expand All @@ -167,7 +185,8 @@ var brokerTestTable = []struct {
}
}},

{[]byte{0x00, 0x00, 0x00, 0x00},
{"OffsetRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetRequest{}
response, err := broker.GetAvailableOffsets(&request)
Expand All @@ -179,7 +198,8 @@ var brokerTestTable = []struct {
}
}},

{[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
{"JoinGroupRequest",
[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := JoinGroupRequest{}
response, err := broker.JoinGroup(&request)
Expand All @@ -191,7 +211,8 @@ var brokerTestTable = []struct {
}
}},

{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
{"SyncGroupRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := SyncGroupRequest{}
response, err := broker.SyncGroup(&request)
Expand All @@ -203,7 +224,8 @@ var brokerTestTable = []struct {
}
}},

{[]byte{0x00, 0x00},
{"LeaveGroupRequest",
[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := LeaveGroupRequest{}
response, err := broker.LeaveGroup(&request)
Expand All @@ -215,7 +237,8 @@ var brokerTestTable = []struct {
}
}},

{[]byte{0x00, 0x00},
{"HeartbeatRequest",
[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := HeartbeatRequest{}
response, err := broker.Heartbeat(&request)
Expand All @@ -227,7 +250,8 @@ var brokerTestTable = []struct {
}
}},

{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
{"ListGroupsRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ListGroupsRequest{}
response, err := broker.ListGroups(&request)
Expand All @@ -239,7 +263,8 @@ var brokerTestTable = []struct {
}
}},

{[]byte{0x00, 0x00, 0x00, 0x00},
{"DescribeGroupsRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := DescribeGroupsRequest{}
response, err := broker.DescribeGroups(&request)
Expand All @@ -251,3 +276,37 @@ var brokerTestTable = []struct {
}
}},
}

func validateBrokerMetrics(t *testing.T, broker *Broker, mockBroker *MockBroker) {
metricValidators := newMetricValidators()
mockBrokerBytesRead := 0
mockBrokerBytesWritten := 0

// Compute socket bytes
for _, requestResponse := range mockBroker.History() {
mockBrokerBytesRead += requestResponse.RequestSize
mockBrokerBytesWritten += requestResponse.ResponseSize
}

// Check that the number of bytes sent corresponds to what the mock broker received
metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
if mockBrokerBytesWritten == 0 {
// This a ProduceRequest with NoResponse
metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
} else {
metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
}

// Check that the number of bytes received corresponds to what the mock broker sent
metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))

// Run the validators
metricValidators.run(t, broker.conf.MetricRegistry)
}
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"crypto/tls"
"regexp"
"time"

"github.com/rcrowley/go-metrics"
)

const defaultClientID = "sarama"
Expand Down Expand Up @@ -233,6 +235,10 @@ type Config struct {
// latest features. Setting it to a version greater than you are actually
// running may lead to random breakage.
Version KafkaVersion
// The registry to define metrics into.
// Defaults to metrics.DefaultRegistry.
// See Examples on how to use the metrics registry
MetricRegistry metrics.Registry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be null? Should it be, for users who don't want metrics? If not, should we validate that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it must not be null as it would panic when invoking methods on it.
Although we could add if ... != nil conditions when using the registry I think the official way is to set metrics.UseNilMetrics to true:
https://godoc.org/github.com/rcrowley/go-metrics#pkg-variables

We can have that setting the default but I remember the overhead being minimal during my tests and when you need high performance you generally wants the metrics to capture those performance.
The Java client has metrics turn on by default too.

I can update some benchmarks to see the impact of metrics being enabled vs disabled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarks would be nice, but leaving it enabled by default is fine as long as the impact isn't huge.

}

// NewConfig returns a new configuration instance with sane defaults.
Expand Down Expand Up @@ -268,6 +274,7 @@ func NewConfig() *Config {
c.ClientID = defaultClientID
c.ChannelBufferSize = 256
c.Version = minVersion
c.MetricRegistry = metrics.DefaultRegistry

return c
}
Expand Down
Loading