Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose broker metrics with go-metrics #701

Merged
merged 5 commits into from
Aug 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 76 additions & 4 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/rcrowley/go-metrics"
)

// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
Expand All @@ -26,6 +28,19 @@ type Broker struct {

responses chan responsePromise
done chan bool

incomingByteRate metrics.Meter
requestRate metrics.Meter
requestSize metrics.Histogram
outgoingByteRate metrics.Meter
responseRate metrics.Meter
responseSize metrics.Histogram
brokerIncomingByteRate metrics.Meter
brokerRequestRate metrics.Meter
brokerRequestSize metrics.Histogram
brokerOutgoingByteRate metrics.Meter
brokerResponseRate metrics.Meter
brokerResponseSize metrics.Histogram
}

type responsePromise struct {
Expand Down Expand Up @@ -84,6 +99,24 @@ func (b *Broker) Open(conf *Config) error {

b.conf = conf

// Create or reuse the global metrics shared between brokers
b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
// Do not gather metrics for seeded broker (only used during bootstrap) because they share
// the same id (-1) and are already exposed through the global metrics above
if b.id >= 0 {
b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
}

if conf.Net.SASL.Enable {
b.connErr = b.sendAndReceiveSASLPlainAuth()
if b.connErr != nil {
Expand Down Expand Up @@ -343,7 +376,8 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
return nil, err
}

_, err = b.conn.Write(buf)
bytes, err := b.conn.Write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -441,8 +475,9 @@ func (b *Broker) responseReceiver() {
continue
}

_, err = io.ReadFull(b.conn, header)
bytesReadHeader, err := io.ReadFull(b.conn, header)
if err != nil {
b.updateIncomingCommunicationMetrics(bytesReadHeader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could just do this immediately after calling ReadFull rather than in all three error branches?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but then I would have to call it twice for the regular case where we read the header and then read the body.
This would result in the same metric for byte rate but will double the rate for response rate metric and mess up with response size histogram metric too.

That being said I think using defer with a totalBytesRead variable might simplify the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot that this code is running inside a for loop so using defer would require wrapping the for code block inside an function, not sure if it makes it easier to read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, OK I missed the fact it would double-count packets in the success case. This is fine as-is.

dead = err
response.errors <- err
continue
Expand All @@ -451,11 +486,13 @@ func (b *Broker) responseReceiver() {
decodedHeader := responseHeader{}
err = decode(header, &decodedHeader)
if err != nil {
b.updateIncomingCommunicationMetrics(bytesReadHeader)
dead = err
response.errors <- err
continue
}
if decodedHeader.correlationID != response.correlationID {
b.updateIncomingCommunicationMetrics(bytesReadHeader)
// TODO if decoded ID < cur ID, discard until we catch up
// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
Expand All @@ -464,7 +501,8 @@ func (b *Broker) responseReceiver() {
}

buf := make([]byte, decodedHeader.length-4)
_, err = io.ReadFull(b.conn, buf)
bytesReadBody, err := io.ReadFull(b.conn, buf)
b.updateIncomingCommunicationMetrics(bytesReadHeader + bytesReadBody)
if err != nil {
dead = err
response.errors <- err
Expand Down Expand Up @@ -506,14 +544,16 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
return err
}

_, err = b.conn.Write(authBytes)
bytesWritten, err := b.conn.Write(authBytes)
b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}

header := make([]byte, 4)
n, err := io.ReadFull(b.conn, header)
b.updateIncomingCommunicationMetrics(n)
// If the credentials are valid, we would get a 4 byte response filled with null characters.
// Otherwise, the broker closes the connection and we get an EOF
if err != nil {
Expand All @@ -524,3 +564,35 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
return nil
}

func (b *Broker) updateIncomingCommunicationMetrics(bytes int) {
b.responseRate.Mark(1)
if b.brokerResponseRate != nil {
b.brokerResponseRate.Mark(1)
}
responseSize := int64(bytes)
b.incomingByteRate.Mark(responseSize)
if b.brokerIncomingByteRate != nil {
b.brokerIncomingByteRate.Mark(responseSize)
}
b.responseSize.Update(responseSize)
if b.brokerResponseSize != nil {
b.brokerResponseSize.Update(responseSize)
}
}

func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
b.requestRate.Mark(1)
if b.brokerRequestRate != nil {
b.brokerRequestRate.Mark(1)
}
requestSize := int64(bytes)
b.outgoingByteRate.Mark(requestSize)
if b.brokerOutgoingByteRate != nil {
b.brokerOutgoingByteRate.Mark(requestSize)
}
b.requestSize.Update(requestSize)
if b.brokerRequestSize != nil {
b.brokerRequestSize.Update(requestSize)
}
}
Loading