Skip to content

Commit

Permalink
rpc: add heartbeat metrics
Browse files Browse the repository at this point in the history
This PR adds metrics to the rpc context. The initial set of metrics includes
gauges to track the current state of heartbeat loops as well as counters for
the number of loops started and failed. The gauges give a nice point-in-time
view of the state of a server's connections but are subject to aliasing while
the counters fail to capture how many of the connections are actually observing
problems. Taken together they ought to provide some insight into the state of
network health between nodes.

Release note: None
  • Loading branch information
ajwerner committed May 7, 2019
1 parent 32801f5 commit 4850713
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 11 deletions.
39 changes: 35 additions & 4 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,19 @@ type heartbeatResult struct {
err error // heartbeat error. should not be nil if everSucceeded is false
}

// state is a helper to return the heartbeatState implied by a heartbeatResult.
func (hr heartbeatResult) state() (s heartbeatState) {
switch {
case !hr.everSucceeded && hr.err != nil:
s = heartbeatInitializing
case hr.everSucceeded && hr.err == nil:
s = heartbeatNominal
case hr.everSucceeded && hr.err != nil:
s = heartbeatFailed
}
return s
}

// Connection is a wrapper around grpc.ClientConn. It prevents the underlying
// connection from being used until it has been validated via heartbeat.
type Connection struct {
Expand Down Expand Up @@ -383,6 +396,8 @@ type Context struct {
NodeID base.NodeIDContainer
version *cluster.ExposedClusterVersion

metrics Metrics

// For unittesting.
BreakerFactory func() *circuit.Breaker
testingDialOpts []grpc.DialOption
Expand Down Expand Up @@ -427,6 +442,7 @@ func NewContext(
ctx.RemoteClocks = newRemoteClockMonitor(
ctx.LocalClock, 10*ctx.heartbeatInterval, baseCtx.HistogramWindowInterval)
ctx.heartbeatTimeout = 2 * ctx.heartbeatInterval
ctx.metrics = makeMetrics()

stopper.RunWorker(ctx.masterCtx, func(context.Context) {
<-stopper.ShouldQuiesce()
Expand Down Expand Up @@ -457,6 +473,11 @@ func (ctx *Context) GetStatsMap() *syncmap.Map {
return &ctx.stats.stats
}

// Metrics returns the Context's Metrics struct.
func (ctx *Context) Metrics() *Metrics {
return &ctx.metrics
}

// GetLocalInternalClientForAddr returns the context's internal batch client
// for target, if it exists.
func (ctx *Context) GetLocalInternalClientForAddr(
Expand Down Expand Up @@ -818,7 +839,15 @@ func (ctx *Context) TestingConnHealth(target string, nodeID roachpb.NodeID) erro

func (ctx *Context) runHeartbeat(
conn *Connection, target string, redialChan <-chan struct{},
) error {
) (retErr error) {
ctx.metrics.HeartbeatLoopsStarted.Inc(1)
state := updateHeartbeatState(&ctx.metrics, heartbeatNotRunning, heartbeatInitializing)
defer func() {
if retErr != nil {
ctx.metrics.HeartbeatLoopsExited.Inc(1)
}
updateHeartbeatState(&ctx.metrics, state, heartbeatNotRunning)
}()
maxOffset := ctx.LocalClock.MaxOffset()
maxOffsetNanos := maxOffset.Nanoseconds()

Expand Down Expand Up @@ -897,12 +926,14 @@ func (ctx *Context) runHeartbeat(
cb()
}
}
conn.heartbeatResult.Store(heartbeatResult{

hr := heartbeatResult{
everSucceeded: everSucceeded,
err: err,
})
}
state = updateHeartbeatState(&ctx.metrics, state, hr.state())
conn.heartbeatResult.Store(hr)
conn.setInitialHeartbeatDone()

return nil
}); err != nil {
return err
Expand Down
76 changes: 69 additions & 7 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestHeartbeatHealth(t *testing.T) {
const serverNodeID = 1
const clientNodeID = 2

serverCtx := newTestContext(clusterID, clock, stopper)
serverCtx := newTestContext(clusterID, clock, stop.NewStopper())
serverCtx.NodeID.Set(context.TODO(), serverNodeID)
s := newTestServer(t, serverCtx)

Expand All @@ -210,12 +210,6 @@ func TestHeartbeatHealth(t *testing.T) {
}
RegisterHeartbeatServer(s, heartbeat)

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
if err != nil {
t.Fatal(err)
}
remoteAddr := ln.Addr().String()

errFailedHeartbeat := errors.New("failed heartbeat")

var hbSuccess atomic.Value
Expand Down Expand Up @@ -257,6 +251,12 @@ func TestHeartbeatHealth(t *testing.T) {
clientCtx.AdvertiseAddr = lisLocalServer.Addr().String()
// Make the interval shorter to speed up the test.
clientCtx.heartbeatInterval = 1 * time.Millisecond

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
if err != nil {
t.Fatal(err)
}
remoteAddr := ln.Addr().String()
if _, err := clientCtx.GRPCDialNode(
remoteAddr, serverNodeID).Connect(context.Background()); err != nil {
t.Fatal(err)
Expand All @@ -270,6 +270,8 @@ func TestHeartbeatHealth(t *testing.T) {
}
return err
})
assertGauges(t, clientCtx.Metrics(),
0 /* initializing */, 1 /* nominal */, 0 /* failed */)

// Should be unhealthy in the presence of failing heartbeats.
hbSuccess.Store(false)
Expand All @@ -279,12 +281,16 @@ func TestHeartbeatHealth(t *testing.T) {
}
return nil
})
assertGauges(t, clientCtx.Metrics(),
0 /* initializing */, 0 /* nominal */, 1 /* failed */)

// Should become healthy in the presence of successful heartbeats.
hbSuccess.Store(true)
testutils.SucceedsSoon(t, func() error {
return clientCtx.TestingConnHealth(remoteAddr, serverNodeID)
})
assertGauges(t, clientCtx.Metrics(),
0 /* initializing */, 1 /* nominal */, 0 /* failed */)

// Should become unhealthy again in the presence of failing heartbeats.
hbSuccess.Store(false)
Expand All @@ -294,12 +300,18 @@ func TestHeartbeatHealth(t *testing.T) {
}
return nil
})
assertGauges(t, clientCtx.Metrics(),
0 /* initializing */, 0 /* nominal */, 1 /* failed */)

// Should become healthy in the presence of successful heartbeats.
hbSuccess.Store(true)
testutils.SucceedsSoon(t, func() error {
return clientCtx.TestingConnHealth(remoteAddr, serverNodeID)
})
assertGauges(t, clientCtx.Metrics(),
0 /* initializing */, 1 /* nominal */, 0 /* failed */)

// Ensure that non-existing connections return ErrNotHeartbeated.

lisNonExistentConnection, err := net.Listen("tcp", "127.0.0.1:0")
defer func() {
Expand All @@ -311,11 +323,20 @@ func TestHeartbeatHealth(t *testing.T) {
if err := clientCtx.TestingConnHealth(lisNonExistentConnection.Addr().String(), 3); err != ErrNotHeartbeated {
t.Errorf("wanted ErrNotHeartbeated, not %v", err)
}
// The connection to Node 3 on the lisNonExistentConnection should be
// initializing and the server connection should be nominal.
testutils.SucceedsSoon(t, func() error {
return checkGauges(clientCtx.Metrics(),
1 /* initializing */, 1 /* nominal */, 0 /* failed */)
})

if err := clientCtx.TestingConnHealth(clientCtx.Addr, clientNodeID); err != ErrNotHeartbeated {
t.Errorf("wanted ErrNotHeartbeated, not %v", err)
}

// Ensure that the local Addr returns ErrNotHeartbeated without having dialed
// a connection but the local AdvertiseAddr successfully returns no error when
// an internal server has been registered.
clientCtx.SetLocalInternalServer(&internalServer{})

if err := clientCtx.TestingConnHealth(clientCtx.Addr, clientNodeID); err != ErrNotHeartbeated {
Expand All @@ -324,6 +345,47 @@ func TestHeartbeatHealth(t *testing.T) {
if err := clientCtx.TestingConnHealth(clientCtx.AdvertiseAddr, clientNodeID); err != nil {
t.Error(err)
}

// Ensure that when the server closes its connection the context attempts to
// reconnect. Both the server connection on Node 1 and the non-existent
// connection should be initializing.
serverCtx.Stopper.Stop(context.Background())
testutils.SucceedsSoon(t, func() error {
return checkGauges(clientCtx.Metrics(),
2 /* initializing */, 0 /* nominal */, 0 /* failed */)
})
const expNumStarted = 3 // 2 for the server and 1 for the non-existent conn
numStarted := clientCtx.Metrics().HeartbeatLoopsStarted.Count()
if numStarted != expNumStarted {
t.Fatalf("expected %d heartbeat loops to have been started, got %d",
expNumStarted, numStarted)
}
const expNumExited = 1 // 1 for the server upon shutdown
numExited := clientCtx.Metrics().HeartbeatLoopsExited.Count()
if numExited != expNumExited {
t.Fatalf("expected %d heartbeat loops to have exited, got %d",
expNumExited, numExited)
}
}

func checkGauges(m *Metrics, initializing, nominal, failed int64) error {
if got := m.HeartbeatsInitializing.Value(); got != initializing {
return errors.Errorf("expected %d initializing heartbeats, got %d", initializing, got)
}
if got := m.HeartbeatsNominal.Value(); got != nominal {
return errors.Errorf("expected %d nominal heartbeats, got %d", nominal, got)
}
if got := m.HeartbeatsFailed.Value(); got != failed {
return errors.Errorf("expected %d failed heartbeats, got %d", failed, got)
}
return nil
}

func assertGauges(t *testing.T, m *Metrics, initializing, nominal, failed int64) {
t.Helper()
if err := checkGauges(m, initializing, nominal, failed); err != nil {
t.Error(err)
}
}

// TestConnectionRemoveNodeIDZero verifies that when a connection initiated via
Expand Down
138 changes: 138 additions & 0 deletions pkg/rpc/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package rpc

import "github.com/cockroachdb/cockroach/pkg/util/metric"

// We want to have a way to track the number of connection
// but we also want to have a way to know that connection health.
//
// For this we're going to add a variety of metrics.
// One will be a gauge of how many heartbeat loops are in which state
// and another will be a counter for heartbeat failures.

var (
// The below gauges store the current state of running heartbeat loops.
// Gauges are useful for examing the current state of a system but can hide
// information is the face of rapidly changing values. The context
// additionally keeps counters for the number of heartbeat loops started
// and completed as well as a counter for the number of heartbeat failures.
// Together these metrics should provide a picture of the state of current
// connections.

metaHeartbeatsInitializing = metric.Metadata{
Name: "rpc.heartbeats.initializing",
Help: "Gauge of current connections in the initializing state",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
metaHeartbeatsNominal = metric.Metadata{
Name: "rpc.heartbeats.nominal",
Help: "Gauge of current connections in the nominal state",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
metaHeartbeatsFailed = metric.Metadata{
Name: "rpc.heartbeats.failed",
Help: "Gauge of current connections in the failed state",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}

metaHeartbeatLoopsStarted = metric.Metadata{
Name: "rpc.heartbeats.loops.started",
Help: "Counter of the number of connection heartbeat loops which " +
"have been started",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
metaHeartbeatLoopsExited = metric.Metadata{
Name: "rpc.heartbeats.loops.exited",
Help: "Counter of the number of connection heartbeat loops which " +
"have exited with an error",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
)

type heartbeatState int

const (
heartbeatNotRunning heartbeatState = iota
heartbeatInitializing
heartbeatNominal
heartbeatFailed
)

func makeMetrics() Metrics {
return Metrics{
HeartbeatLoopsStarted: metric.NewCounter(metaHeartbeatLoopsStarted),
HeartbeatLoopsExited: metric.NewCounter(metaHeartbeatLoopsExited),
HeartbeatsInitializing: metric.NewGauge(metaHeartbeatsInitializing),
HeartbeatsNominal: metric.NewGauge(metaHeartbeatsNominal),
HeartbeatsFailed: metric.NewGauge(metaHeartbeatsFailed),
}
}

// Metrics is a metrics struct for Context metrics.
type Metrics struct {

// HeartbeatLoopsStarted is a counter which tracks the number of heartbeat
// loops which have been started.
HeartbeatLoopsStarted *metric.Counter

// HeartbeatLoopsExited is a counter which tracks the number of heartbeat
// loops which have exited with an error. The only time a heartbeat loop
// exits without an error is during server shutdown.
HeartbeatLoopsExited *metric.Counter

// HeartbeatsInitializing tracks the current number of heartbeat loops
// which have not yet ever succeeded.
HeartbeatsInitializing *metric.Gauge
// HeartbeatsNominal tracks the current number of heartbeat loops which
// succeeded on their previous attempt.
HeartbeatsNominal *metric.Gauge
// HeartbeatsNominal tracks the current number of heartbeat loops which
// succeeded on their previous attempt.
HeartbeatsFailed *metric.Gauge
}

// updateHeartbeatState decrements the gauge for the current state and
// increments the gauge for the new state, returning the new state.
func updateHeartbeatState(m *Metrics, old, new heartbeatState) heartbeatState {
if old == new {
return new
}
if g := heartbeatGauge(m, new); g != nil {
g.Inc(1)
}
if g := heartbeatGauge(m, old); g != nil {
g.Dec(1)
}
return new
}

// heartbeatGauge returns the appropriate gauge for the given heartbeatState.
func heartbeatGauge(m *Metrics, s heartbeatState) (g *metric.Gauge) {
switch s {
case heartbeatInitializing:
g = m.HeartbeatsInitializing
case heartbeatNominal:
g = m.HeartbeatsNominal
case heartbeatFailed:
g = m.HeartbeatsFailed
}
return g
}
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
log.Fatal(ctx, err)
}
}
s.registry.AddMetricStruct(s.rpcContext.Metrics())

s.grpc = newGRPCServer(s.rpcContext)

Expand Down

0 comments on commit 4850713

Please sign in to comment.