From 4850713b64e397ea2c1343e480365f111c8265b8 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 7 May 2019 10:42:18 -0400 Subject: [PATCH] rpc: add heartbeat metrics This PR adds metrics to the rpc context. The initial set of metrics includes gauges to track the current state of heartbeat loops as well as counters for the number of loops started and failed. The gauges give a nice point-in-time view of the state of a server's connections but are subject to aliasing while the counters fail to capture how many of the connections are actually observing problems. Taken together they ought to provide some insight into the state of network health between nodes. Release note: None --- pkg/rpc/context.go | 39 ++++++++++-- pkg/rpc/context_test.go | 76 ++++++++++++++++++++-- pkg/rpc/metrics.go | 138 ++++++++++++++++++++++++++++++++++++++++ pkg/server/server.go | 1 + 4 files changed, 243 insertions(+), 11 deletions(-) create mode 100644 pkg/rpc/metrics.go diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index af95ac9366e6..ac4fa60228b8 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -291,6 +291,19 @@ type heartbeatResult struct { err error // heartbeat error. should not be nil if everSucceeded is false } +// state is a helper to return the heartbeatState implied by a heartbeatResult. +func (hr heartbeatResult) state() (s heartbeatState) { + switch { + case !hr.everSucceeded && hr.err != nil: + s = heartbeatInitializing + case hr.everSucceeded && hr.err == nil: + s = heartbeatNominal + case hr.everSucceeded && hr.err != nil: + s = heartbeatFailed + } + return s +} + // Connection is a wrapper around grpc.ClientConn. It prevents the underlying // connection from being used until it has been validated via heartbeat. type Connection struct { @@ -383,6 +396,8 @@ type Context struct { NodeID base.NodeIDContainer version *cluster.ExposedClusterVersion + metrics Metrics + // For unittesting. BreakerFactory func() *circuit.Breaker testingDialOpts []grpc.DialOption @@ -427,6 +442,7 @@ func NewContext( ctx.RemoteClocks = newRemoteClockMonitor( ctx.LocalClock, 10*ctx.heartbeatInterval, baseCtx.HistogramWindowInterval) ctx.heartbeatTimeout = 2 * ctx.heartbeatInterval + ctx.metrics = makeMetrics() stopper.RunWorker(ctx.masterCtx, func(context.Context) { <-stopper.ShouldQuiesce() @@ -457,6 +473,11 @@ func (ctx *Context) GetStatsMap() *syncmap.Map { return &ctx.stats.stats } +// Metrics returns the Context's Metrics struct. +func (ctx *Context) Metrics() *Metrics { + return &ctx.metrics +} + // GetLocalInternalClientForAddr returns the context's internal batch client // for target, if it exists. func (ctx *Context) GetLocalInternalClientForAddr( @@ -818,7 +839,15 @@ func (ctx *Context) TestingConnHealth(target string, nodeID roachpb.NodeID) erro func (ctx *Context) runHeartbeat( conn *Connection, target string, redialChan <-chan struct{}, -) error { +) (retErr error) { + ctx.metrics.HeartbeatLoopsStarted.Inc(1) + state := updateHeartbeatState(&ctx.metrics, heartbeatNotRunning, heartbeatInitializing) + defer func() { + if retErr != nil { + ctx.metrics.HeartbeatLoopsExited.Inc(1) + } + updateHeartbeatState(&ctx.metrics, state, heartbeatNotRunning) + }() maxOffset := ctx.LocalClock.MaxOffset() maxOffsetNanos := maxOffset.Nanoseconds() @@ -897,12 +926,14 @@ func (ctx *Context) runHeartbeat( cb() } } - conn.heartbeatResult.Store(heartbeatResult{ + + hr := heartbeatResult{ everSucceeded: everSucceeded, err: err, - }) + } + state = updateHeartbeatState(&ctx.metrics, state, hr.state()) + conn.heartbeatResult.Store(hr) conn.setInitialHeartbeatDone() - return nil }); err != nil { return err diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 3019046fcbb0..e8371b9a483f 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -196,7 +196,7 @@ func TestHeartbeatHealth(t *testing.T) { const serverNodeID = 1 const clientNodeID = 2 - serverCtx := newTestContext(clusterID, clock, stopper) + serverCtx := newTestContext(clusterID, clock, stop.NewStopper()) serverCtx.NodeID.Set(context.TODO(), serverNodeID) s := newTestServer(t, serverCtx) @@ -210,12 +210,6 @@ func TestHeartbeatHealth(t *testing.T) { } RegisterHeartbeatServer(s, heartbeat) - ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr) - if err != nil { - t.Fatal(err) - } - remoteAddr := ln.Addr().String() - errFailedHeartbeat := errors.New("failed heartbeat") var hbSuccess atomic.Value @@ -257,6 +251,12 @@ func TestHeartbeatHealth(t *testing.T) { clientCtx.AdvertiseAddr = lisLocalServer.Addr().String() // Make the interval shorter to speed up the test. clientCtx.heartbeatInterval = 1 * time.Millisecond + + ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr) + if err != nil { + t.Fatal(err) + } + remoteAddr := ln.Addr().String() if _, err := clientCtx.GRPCDialNode( remoteAddr, serverNodeID).Connect(context.Background()); err != nil { t.Fatal(err) @@ -270,6 +270,8 @@ func TestHeartbeatHealth(t *testing.T) { } return err }) + assertGauges(t, clientCtx.Metrics(), + 0 /* initializing */, 1 /* nominal */, 0 /* failed */) // Should be unhealthy in the presence of failing heartbeats. hbSuccess.Store(false) @@ -279,12 +281,16 @@ func TestHeartbeatHealth(t *testing.T) { } return nil }) + assertGauges(t, clientCtx.Metrics(), + 0 /* initializing */, 0 /* nominal */, 1 /* failed */) // Should become healthy in the presence of successful heartbeats. hbSuccess.Store(true) testutils.SucceedsSoon(t, func() error { return clientCtx.TestingConnHealth(remoteAddr, serverNodeID) }) + assertGauges(t, clientCtx.Metrics(), + 0 /* initializing */, 1 /* nominal */, 0 /* failed */) // Should become unhealthy again in the presence of failing heartbeats. hbSuccess.Store(false) @@ -294,12 +300,18 @@ func TestHeartbeatHealth(t *testing.T) { } return nil }) + assertGauges(t, clientCtx.Metrics(), + 0 /* initializing */, 0 /* nominal */, 1 /* failed */) // Should become healthy in the presence of successful heartbeats. hbSuccess.Store(true) testutils.SucceedsSoon(t, func() error { return clientCtx.TestingConnHealth(remoteAddr, serverNodeID) }) + assertGauges(t, clientCtx.Metrics(), + 0 /* initializing */, 1 /* nominal */, 0 /* failed */) + + // Ensure that non-existing connections return ErrNotHeartbeated. lisNonExistentConnection, err := net.Listen("tcp", "127.0.0.1:0") defer func() { @@ -311,11 +323,20 @@ func TestHeartbeatHealth(t *testing.T) { if err := clientCtx.TestingConnHealth(lisNonExistentConnection.Addr().String(), 3); err != ErrNotHeartbeated { t.Errorf("wanted ErrNotHeartbeated, not %v", err) } + // The connection to Node 3 on the lisNonExistentConnection should be + // initializing and the server connection should be nominal. + testutils.SucceedsSoon(t, func() error { + return checkGauges(clientCtx.Metrics(), + 1 /* initializing */, 1 /* nominal */, 0 /* failed */) + }) if err := clientCtx.TestingConnHealth(clientCtx.Addr, clientNodeID); err != ErrNotHeartbeated { t.Errorf("wanted ErrNotHeartbeated, not %v", err) } + // Ensure that the local Addr returns ErrNotHeartbeated without having dialed + // a connection but the local AdvertiseAddr successfully returns no error when + // an internal server has been registered. clientCtx.SetLocalInternalServer(&internalServer{}) if err := clientCtx.TestingConnHealth(clientCtx.Addr, clientNodeID); err != ErrNotHeartbeated { @@ -324,6 +345,47 @@ func TestHeartbeatHealth(t *testing.T) { if err := clientCtx.TestingConnHealth(clientCtx.AdvertiseAddr, clientNodeID); err != nil { t.Error(err) } + + // Ensure that when the server closes its connection the context attempts to + // reconnect. Both the server connection on Node 1 and the non-existent + // connection should be initializing. + serverCtx.Stopper.Stop(context.Background()) + testutils.SucceedsSoon(t, func() error { + return checkGauges(clientCtx.Metrics(), + 2 /* initializing */, 0 /* nominal */, 0 /* failed */) + }) + const expNumStarted = 3 // 2 for the server and 1 for the non-existent conn + numStarted := clientCtx.Metrics().HeartbeatLoopsStarted.Count() + if numStarted != expNumStarted { + t.Fatalf("expected %d heartbeat loops to have been started, got %d", + expNumStarted, numStarted) + } + const expNumExited = 1 // 1 for the server upon shutdown + numExited := clientCtx.Metrics().HeartbeatLoopsExited.Count() + if numExited != expNumExited { + t.Fatalf("expected %d heartbeat loops to have exited, got %d", + expNumExited, numExited) + } +} + +func checkGauges(m *Metrics, initializing, nominal, failed int64) error { + if got := m.HeartbeatsInitializing.Value(); got != initializing { + return errors.Errorf("expected %d initializing heartbeats, got %d", initializing, got) + } + if got := m.HeartbeatsNominal.Value(); got != nominal { + return errors.Errorf("expected %d nominal heartbeats, got %d", nominal, got) + } + if got := m.HeartbeatsFailed.Value(); got != failed { + return errors.Errorf("expected %d failed heartbeats, got %d", failed, got) + } + return nil +} + +func assertGauges(t *testing.T, m *Metrics, initializing, nominal, failed int64) { + t.Helper() + if err := checkGauges(m, initializing, nominal, failed); err != nil { + t.Error(err) + } } // TestConnectionRemoveNodeIDZero verifies that when a connection initiated via diff --git a/pkg/rpc/metrics.go b/pkg/rpc/metrics.go new file mode 100644 index 000000000000..1cd0a2082975 --- /dev/null +++ b/pkg/rpc/metrics.go @@ -0,0 +1,138 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package rpc + +import "github.com/cockroachdb/cockroach/pkg/util/metric" + +// We want to have a way to track the number of connection +// but we also want to have a way to know that connection health. +// +// For this we're going to add a variety of metrics. +// One will be a gauge of how many heartbeat loops are in which state +// and another will be a counter for heartbeat failures. + +var ( + // The below gauges store the current state of running heartbeat loops. + // Gauges are useful for examing the current state of a system but can hide + // information is the face of rapidly changing values. The context + // additionally keeps counters for the number of heartbeat loops started + // and completed as well as a counter for the number of heartbeat failures. + // Together these metrics should provide a picture of the state of current + // connections. + + metaHeartbeatsInitializing = metric.Metadata{ + Name: "rpc.heartbeats.initializing", + Help: "Gauge of current connections in the initializing state", + Measurement: "Connections", + Unit: metric.Unit_COUNT, + } + metaHeartbeatsNominal = metric.Metadata{ + Name: "rpc.heartbeats.nominal", + Help: "Gauge of current connections in the nominal state", + Measurement: "Connections", + Unit: metric.Unit_COUNT, + } + metaHeartbeatsFailed = metric.Metadata{ + Name: "rpc.heartbeats.failed", + Help: "Gauge of current connections in the failed state", + Measurement: "Connections", + Unit: metric.Unit_COUNT, + } + + metaHeartbeatLoopsStarted = metric.Metadata{ + Name: "rpc.heartbeats.loops.started", + Help: "Counter of the number of connection heartbeat loops which " + + "have been started", + Measurement: "Connections", + Unit: metric.Unit_COUNT, + } + metaHeartbeatLoopsExited = metric.Metadata{ + Name: "rpc.heartbeats.loops.exited", + Help: "Counter of the number of connection heartbeat loops which " + + "have exited with an error", + Measurement: "Connections", + Unit: metric.Unit_COUNT, + } +) + +type heartbeatState int + +const ( + heartbeatNotRunning heartbeatState = iota + heartbeatInitializing + heartbeatNominal + heartbeatFailed +) + +func makeMetrics() Metrics { + return Metrics{ + HeartbeatLoopsStarted: metric.NewCounter(metaHeartbeatLoopsStarted), + HeartbeatLoopsExited: metric.NewCounter(metaHeartbeatLoopsExited), + HeartbeatsInitializing: metric.NewGauge(metaHeartbeatsInitializing), + HeartbeatsNominal: metric.NewGauge(metaHeartbeatsNominal), + HeartbeatsFailed: metric.NewGauge(metaHeartbeatsFailed), + } +} + +// Metrics is a metrics struct for Context metrics. +type Metrics struct { + + // HeartbeatLoopsStarted is a counter which tracks the number of heartbeat + // loops which have been started. + HeartbeatLoopsStarted *metric.Counter + + // HeartbeatLoopsExited is a counter which tracks the number of heartbeat + // loops which have exited with an error. The only time a heartbeat loop + // exits without an error is during server shutdown. + HeartbeatLoopsExited *metric.Counter + + // HeartbeatsInitializing tracks the current number of heartbeat loops + // which have not yet ever succeeded. + HeartbeatsInitializing *metric.Gauge + // HeartbeatsNominal tracks the current number of heartbeat loops which + // succeeded on their previous attempt. + HeartbeatsNominal *metric.Gauge + // HeartbeatsNominal tracks the current number of heartbeat loops which + // succeeded on their previous attempt. + HeartbeatsFailed *metric.Gauge +} + +// updateHeartbeatState decrements the gauge for the current state and +// increments the gauge for the new state, returning the new state. +func updateHeartbeatState(m *Metrics, old, new heartbeatState) heartbeatState { + if old == new { + return new + } + if g := heartbeatGauge(m, new); g != nil { + g.Inc(1) + } + if g := heartbeatGauge(m, old); g != nil { + g.Dec(1) + } + return new +} + +// heartbeatGauge returns the appropriate gauge for the given heartbeatState. +func heartbeatGauge(m *Metrics, s heartbeatState) (g *metric.Gauge) { + switch s { + case heartbeatInitializing: + g = m.HeartbeatsInitializing + case heartbeatNominal: + g = m.HeartbeatsNominal + case heartbeatFailed: + g = m.HeartbeatsFailed + } + return g +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 5e3b3ffcff9b..51318715f5c1 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -253,6 +253,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { log.Fatal(ctx, err) } } + s.registry.AddMetricStruct(s.rpcContext.Metrics()) s.grpc = newGRPCServer(s.rpcContext)