Skip to content

Commit

Permalink
Add health checker to leader election library
Browse files Browse the repository at this point in the history
  • Loading branch information
verult committed Nov 12, 2020
1 parent a833d13 commit d93b265
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 34 deletions.
36 changes: 36 additions & 0 deletions leaderelection/leader_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io/ioutil"
"net/http"
"os"
"regexp"
"strings"
Expand All @@ -39,6 +40,9 @@ const (
defaultLeaseDuration = 15 * time.Second
defaultRenewDeadline = 10 * time.Second
defaultRetryPeriod = 5 * time.Second
healthCheckTimeout = 20 * time.Second

HealthCheckerAddress = "/healthz/leader-election"
)

// leaderElection is a convenience wrapper around client-go's leader election library.
Expand All @@ -55,6 +59,9 @@ type leaderElection struct {
// valid options are resourcelock.LeasesResourceLock, resourcelock.EndpointsResourceLock,
// and resourcelock.ConfigMapsResourceLock
resourceLock string
// healthCheck reports unhealthy if leader election fails to renew leadership
// within a timeout period.
healthCheck *leaderelection.HealthzAdaptor

leaseDuration time.Duration
renewDeadline time.Duration
Expand All @@ -76,6 +83,7 @@ func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName string
runFunc: runFunc,
lockName: lockName,
resourceLock: resourcelock.LeasesResourceLock,
healthCheck: leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout),
leaseDuration: defaultLeaseDuration,
renewDeadline: defaultRenewDeadline,
retryPeriod: defaultRetryPeriod,
Expand All @@ -89,6 +97,7 @@ func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName str
runFunc: runFunc,
lockName: lockName,
resourceLock: resourcelock.EndpointsResourceLock,
healthCheck: leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout),
leaseDuration: defaultLeaseDuration,
renewDeadline: defaultRenewDeadline,
retryPeriod: defaultRetryPeriod,
Expand All @@ -102,6 +111,7 @@ func NewLeaderElectionWithConfigMaps(clientset kubernetes.Interface, lockName st
runFunc: runFunc,
lockName: lockName,
resourceLock: resourcelock.ConfigMapsResourceLock,
healthCheck: leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout),
leaseDuration: defaultLeaseDuration,
renewDeadline: defaultRenewDeadline,
retryPeriod: defaultRetryPeriod,
Expand Down Expand Up @@ -134,6 +144,19 @@ func (l *leaderElection) WithContext(ctx context.Context) {
l.ctx = ctx
}

// Server represents any type that could serve HTTP requests for the leader
// election health check endpoint.
type Server interface {
Handle(pattern string, handler http.Handler)
}

// RegisterHealthCheck creates a health check for this leader election object and
// registers its HTTP handler to the given server at the path specified by the
// constant "healthCheckerAddress".
func (l *leaderElection) RegisterHealthCheck(s Server) {
s.Handle(HealthCheckerAddress, adaptCheckToHandler(l.healthCheck.Check))
}

func (l *leaderElection) Run() error {
if l.identity == "" {
id, err := defaultLeaderElectionIdentity()
Expand Down Expand Up @@ -179,6 +202,7 @@ func (l *leaderElection) Run() error {
klog.V(3).Infof("new leader detected, current leader: %s", identity)
},
},
WatchDog: l.healthCheck,
}

ctx := l.ctx
Expand Down Expand Up @@ -220,3 +244,15 @@ func inClusterNamespace() string {

return "default"
}

// adaptCheckToHandler returns an http.HandlerFunc that serves the provided checks.
func adaptCheckToHandler(c func(r *http.Request) error) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := c(r)
if err != nil {
http.Error(w, fmt.Sprintf("internal server error: %v", err), http.StatusInternalServerError)
} else {
fmt.Fprint(w, "ok")
}
})
}
36 changes: 13 additions & 23 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/component-base/metrics"
"k8s.io/klog/v2"
)

const (
Expand Down Expand Up @@ -90,10 +89,15 @@ type CSIMetricsManager interface {
// driverName - Name of the CSI driver against which this operation was executed.
SetDriverName(driverName string)

// StartMetricsEndpoint starts the metrics endpoint at the specified address/path
// for this metrics manager.
// If the metricsAddress is an empty string, this will be a no op.
StartMetricsEndpoint(metricsAddress, metricsPath string)
// RegisterToServer registers an HTTP handler for this metrics manager to the
// given server at the specified address/path.
RegisterToServer(s Server, metricsPath string)
}

// Server represents any type that could serve HTTP requests for the metrics
// endpoint.
type Server interface {
Handle(pattern string, handler http.Handler)
}

// MetricsManagerOption is used to pass optional configuration to a
Expand Down Expand Up @@ -325,27 +329,13 @@ func (cmm *csiMetricsManager) SetDriverName(driverName string) {
}
}

// StartMetricsEndpoint starts the metrics endpoint at the specified address/path
// for this metrics manager on a new go routine.
// If the metricsAddress is an empty string, this will be a no op.
func (cmm *csiMetricsManager) StartMetricsEndpoint(metricsAddress, metricsPath string) {
if metricsAddress == "" {
klog.Warningf("metrics endpoint will not be started because `metrics-address` was not specified.")
return
}

http.Handle(metricsPath, metrics.HandlerFor(
// RegisterToServer registers an HTTP handler for this metrics manager to the
// given server at the specified address/path.
func (cmm *csiMetricsManager) RegisterToServer(s Server, metricsPath string) {
s.Handle(metricsPath, metrics.HandlerFor(
cmm.GetRegistry(),
metrics.HandlerOpts{
ErrorHandling: metrics.ContinueOnError}))

// Spawn a new go routine to listen on specified endpoint
go func() {
err := http.ListenAndServe(metricsAddress, nil)
if err != nil {
klog.Fatalf("Failed to start prometheus metrics endpoint on specified address (%q) and path (%q): %s", metricsAddress, metricsPath, err)
}
}()
}

// VerifyMetricsMatch is a helper function that verifies that the expected and
Expand Down
20 changes: 9 additions & 11 deletions metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package metrics
import (
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -481,29 +482,26 @@ func TestRecordMetrics_Negative(t *testing.T) {
}
}

func TestStartMetricsEndPoint_Noop(t *testing.T) {
func TestRegisterToServer_Noop(t *testing.T) {
// Arrange
cmm := NewCSIMetricsManagerForSidecar(
"fake.csi.driver.io" /* driverName */)
operationDuration, _ := time.ParseDuration("20s")
mux := http.NewServeMux()

// Act
cmm.StartMetricsEndpoint(":8080", "/metrics")
cmm.RegisterToServer(mux, "/metrics")
cmm.RecordMetrics(
"/csi.v1.Controller/ControllerGetCapabilities", /* operationName */
nil, /* operationErr */
operationDuration /* operationDuration */)

// Assert
request, err := http.NewRequest("GET", "http://localhost:8080/metrics", strings.NewReader(""))
if err != nil {
t.Fatalf("Creating request for metrics endpoint failed: %v", err)
}
client := &http.Client{}
resp, err := client.Do(request)
if err != nil {
t.Fatalf("Failed to GET metrics. Error: %v", err)
}
request := httptest.NewRequest("GET", "/metrics", strings.NewReader(""))
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, request)
resp := rec.Result()

if resp.StatusCode != 200 {
t.Fatalf("/metrics response status not 200. Response was: %+v", resp)
}
Expand Down

0 comments on commit d93b265

Please sign in to comment.