Skip to content

Commit

Permalink
Merge pull request kubernetes#104328 from pohly/release-1.21-metrics-…
Browse files Browse the repository at this point in the history
…grabbing

release 1.21: metrics grabbing
  • Loading branch information
k8s-ci-robot authored Oct 4, 2021
2 parents 8724967 + e0f8a61 commit 60d7177
Show file tree
Hide file tree
Showing 18 changed files with 407 additions and 97 deletions.
2 changes: 1 addition & 1 deletion test/e2e/apimachinery/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func verifyRemainingObjects(f *framework.Framework, objects map[string]int) (boo
func gatherMetrics(f *framework.Framework) {
ginkgo.By("Gathering metrics")
var summary framework.TestDataSummary
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, false, false, true, false, false)
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, f.ClientConfig(), false, false, true, false, false)
if err != nil {
framework.Logf("Failed to create MetricsGrabber. Skipping metrics gathering.")
} else {
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (f *Framework) BeforeEach() {

gatherMetricsAfterTest := TestContext.GatherMetricsAfterTest == "true" || TestContext.GatherMetricsAfterTest == "master"
if gatherMetricsAfterTest && TestContext.IncludeClusterAutoscalerMetrics {
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, !ProviderIs("kubemark"), false, false, false, TestContext.IncludeClusterAutoscalerMetrics)
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, f.ClientConfig(), !ProviderIs("kubemark"), false, false, false, TestContext.IncludeClusterAutoscalerMetrics)
if err != nil {
Logf("Failed to create MetricsGrabber (skipping ClusterAutoscaler metrics gathering before test): %v", err)
} else {
Expand Down Expand Up @@ -449,7 +449,7 @@ func (f *Framework) AfterEach() {
ginkgo.By("Gathering metrics")
// Grab apiserver, scheduler, controller-manager metrics and (optionally) nodes' kubelet metrics.
grabMetricsFromKubelets := TestContext.GatherMetricsAfterTest != "master" && !ProviderIs("kubemark")
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, grabMetricsFromKubelets, true, true, true, TestContext.IncludeClusterAutoscalerMetrics)
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, f.ClientConfig(), grabMetricsFromKubelets, true, true, true, TestContext.IncludeClusterAutoscalerMetrics)
if err != nil {
Logf("Failed to create MetricsGrabber (skipping metrics gathering): %v", err)
} else {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/framework/metrics/kubelet_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func getKubeletMetricsFromNode(c clientset.Interface, nodeName string) (KubeletM
if c == nil {
return GrabKubeletMetricsWithoutProxy(nodeName, "/metrics")
}
grabber, err := NewMetricsGrabber(c, nil, true, false, false, false, false)
grabber, err := NewMetricsGrabber(c, nil, nil, true, false, false, false, false)
if err != nil {
return KubeletMetrics{}, err
}
Expand Down
186 changes: 141 additions & 45 deletions test/e2e/framework/metrics/metrics_grabber.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,37 @@ package metrics

import (
"context"
"errors"
"fmt"
"net"
"regexp"
"sync"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"

"k8s.io/client-go/rest"
"k8s.io/klog/v2"

e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
)

const (
// insecureSchedulerPort is the default port for the scheduler status server.
// May be overridden by a flag at startup.
// Deprecated: use the secure KubeSchedulerPort instead.
insecureSchedulerPort = 10251
// insecureKubeControllerManagerPort is the default port for the controller manager status server.
// May be overridden by a flag at startup.
// Deprecated: use the secure KubeControllerManagerPort instead.
insecureKubeControllerManagerPort = 10252
// kubeSchedulerPort is the default port for the scheduler status server.
kubeSchedulerPort = 10259
// kubeControllerManagerPort is the default port for the controller manager status server.
kubeControllerManagerPort = 10257
)

// MetricsGrabbingDisabledError is an error that is wrapped by the
// different MetricsGrabber.Wrap functions when metrics grabbing is
// not supported. Tests that check metrics data should then skip
// the check.
var MetricsGrabbingDisabledError = errors.New("metrics grabbing disabled")

// Collection is metrics collection of components
type Collection struct {
APIServerMetrics APIServerMetrics
Expand All @@ -56,25 +62,38 @@ type Collection struct {
type Grabber struct {
client clientset.Interface
externalClient clientset.Interface
config *rest.Config
grabFromAPIServer bool
grabFromControllerManager bool
grabFromKubelets bool
grabFromScheduler bool
grabFromClusterAutoscaler bool
kubeScheduler string
waitForSchedulerReadyOnce sync.Once
kubeControllerManager string
waitForControllerManagerReadyOnce sync.Once
}

// NewMetricsGrabber returns new metrics which are initialized.
func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, kubelets bool, scheduler bool, controllers bool, apiServer bool, clusterAutoscaler bool) (*Grabber, error) {
// NewMetricsGrabber prepares for grabbing metrics data from several different
// components. It should be called when those components are running because
// it needs to communicate with them to determine for which components
// metrics data can be retrieved.
//
// Collecting metrics data is an optional debug feature. Not all clusters will
// support it. If disabled for a component, the corresponding Grab function
// will immediately return an error derived from MetricsGrabbingDisabledError.
func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, config *rest.Config, kubelets bool, scheduler bool, controllers bool, apiServer bool, clusterAutoscaler bool) (*Grabber, error) {

kubeScheduler := ""
kubeControllerManager := ""

regKubeScheduler := regexp.MustCompile("kube-scheduler-.*")
regKubeControllerManager := regexp.MustCompile("kube-controller-manager-.*")

if (scheduler || controllers) && config == nil {
return nil, errors.New("a rest config is required for grabbing kube-controller and kube-controller-manager metrics")
}

podList, err := c.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
Expand All @@ -93,31 +112,46 @@ func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, kubelets b
break
}
}
if kubeScheduler == "" {
scheduler = false
klog.Warningf("Can't find kube-scheduler pod. Grabbing metrics from kube-scheduler is disabled.")
}
if kubeControllerManager == "" {
controllers = false
klog.Warningf("Can't find kube-controller-manager pod. Grabbing metrics from kube-controller-manager is disabled.")
}
if ec == nil {
if clusterAutoscaler && ec == nil {
klog.Warningf("Did not receive an external client interface. Grabbing metrics from ClusterAutoscaler is disabled.")
}

return &Grabber{
client: c,
externalClient: ec,
config: config,
grabFromAPIServer: apiServer,
grabFromControllerManager: controllers,
grabFromControllerManager: checkPodDebugHandlers(c, controllers, "kube-controller-manager", kubeControllerManager),
grabFromKubelets: kubelets,
grabFromScheduler: scheduler,
grabFromScheduler: checkPodDebugHandlers(c, scheduler, "kube-scheduler", kubeScheduler),
grabFromClusterAutoscaler: clusterAutoscaler,
kubeScheduler: kubeScheduler,
kubeControllerManager: kubeControllerManager,
}, nil
}

func checkPodDebugHandlers(c clientset.Interface, requested bool, component, podName string) bool {
if !requested {
return false
}
if podName == "" {
klog.Warningf("Can't find %s pod. Grabbing metrics from %s is disabled.", component, component)
return false
}

// The debug handlers on the host where the pod runs might be disabled.
// We can check that indirectly by trying to retrieve log output.
limit := int64(1)
if _, err := c.CoreV1().Pods(metav1.NamespaceSystem).GetLogs(podName, &v1.PodLogOptions{LimitBytes: &limit}).DoRaw(context.TODO()); err != nil {
klog.Warningf("Can't retrieve log output of %s (%q). Debug handlers might be disabled in kubelet. Grabbing metrics from %s is disabled.",
podName, err, component)
return false
}

// Metrics gathering enabled.
return true
}

// HasControlPlanePods returns true if metrics grabber was able to find control-plane pods
func (g *Grabber) HasControlPlanePods() bool {
return g.kubeScheduler != "" && g.kubeControllerManager != ""
Expand Down Expand Up @@ -149,20 +183,38 @@ func (g *Grabber) grabFromKubeletInternal(nodeName string, kubeletPort int) (Kub

// GrabFromScheduler returns metrics from scheduler
func (g *Grabber) GrabFromScheduler() (SchedulerMetrics, error) {
if g.kubeScheduler == "" {
return SchedulerMetrics{}, fmt.Errorf("kube-scheduler pod is not registered. Skipping Scheduler's metrics gathering")
if !g.grabFromScheduler {
return SchedulerMetrics{}, fmt.Errorf("kube-scheduler: %w", MetricsGrabbingDisabledError)
}
output, err := g.getMetricsFromPod(g.client, g.kubeScheduler, metav1.NamespaceSystem, insecureSchedulerPort)

var err error

g.waitForSchedulerReadyOnce.Do(func() {
if readyErr := e2epod.WaitTimeoutForPodReadyInNamespace(g.client, g.kubeScheduler, metav1.NamespaceSystem, 5*time.Minute); readyErr != nil {
err = fmt.Errorf("error waiting for kube-scheduler pod to be ready: %w", readyErr)
}
})
if err != nil {
return SchedulerMetrics{}, err
}

var lastMetricsFetchErr error
var output string
if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
output, lastMetricsFetchErr = g.getSecureMetricsFromPod(g.kubeScheduler, metav1.NamespaceSystem, kubeSchedulerPort)
return lastMetricsFetchErr == nil, nil
}); metricsWaitErr != nil {
err := fmt.Errorf("error waiting for kube-scheduler pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr)
return SchedulerMetrics{}, err
}

return parseSchedulerMetrics(output)
}

// GrabFromClusterAutoscaler returns metrics from cluster autoscaler
func (g *Grabber) GrabFromClusterAutoscaler() (ClusterAutoscalerMetrics, error) {
if !g.HasControlPlanePods() && g.externalClient == nil {
return ClusterAutoscalerMetrics{}, fmt.Errorf("Did not find control-plane pods. Skipping ClusterAutoscaler's metrics gathering")
return ClusterAutoscalerMetrics{}, fmt.Errorf("ClusterAutoscaler: %w", MetricsGrabbingDisabledError)
}
var client clientset.Interface
var namespace string
Expand All @@ -182,35 +234,31 @@ func (g *Grabber) GrabFromClusterAutoscaler() (ClusterAutoscalerMetrics, error)

// GrabFromControllerManager returns metrics from controller manager
func (g *Grabber) GrabFromControllerManager() (ControllerManagerMetrics, error) {
if g.kubeControllerManager == "" {
return ControllerManagerMetrics{}, fmt.Errorf("kube-controller-manager pod is not registered. Skipping ControllerManager's metrics gathering")
if !g.grabFromControllerManager {
return ControllerManagerMetrics{}, fmt.Errorf("kube-controller-manager: %w", MetricsGrabbingDisabledError)
}

var err error
podName := g.kubeControllerManager
g.waitForControllerManagerReadyOnce.Do(func() {
if readyErr := e2epod.WaitForPodsReady(g.client, metav1.NamespaceSystem, podName, 0); readyErr != nil {
err = fmt.Errorf("error waiting for controller manager pod to be ready: %w", readyErr)
return
}

var lastMetricsFetchErr error
if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
_, lastMetricsFetchErr = g.getMetricsFromPod(g.client, podName, metav1.NamespaceSystem, insecureKubeControllerManagerPort)
return lastMetricsFetchErr == nil, nil
}); metricsWaitErr != nil {
err = fmt.Errorf("error waiting for controller manager pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr)
return
g.waitForControllerManagerReadyOnce.Do(func() {
if readyErr := e2epod.WaitTimeoutForPodReadyInNamespace(g.client, g.kubeControllerManager, metav1.NamespaceSystem, 5*time.Minute); readyErr != nil {
err = fmt.Errorf("error waiting for kube-controller-manager pod to be ready: %w", readyErr)
}
})
if err != nil {
return ControllerManagerMetrics{}, err
}

output, err := g.getMetricsFromPod(g.client, podName, metav1.NamespaceSystem, insecureKubeControllerManagerPort)
if err != nil {
var output string
var lastMetricsFetchErr error
if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
output, lastMetricsFetchErr = g.getSecureMetricsFromPod(g.kubeControllerManager, metav1.NamespaceSystem, kubeControllerManagerPort)
return lastMetricsFetchErr == nil, nil
}); metricsWaitErr != nil {
err := fmt.Errorf("error waiting for kube-controller-manager to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr)
return ControllerManagerMetrics{}, err
}

return parseControllerManagerMetrics(output)
}

Expand Down Expand Up @@ -281,16 +329,64 @@ func (g *Grabber) Grab() (Collection, error) {
return result, nil
}

// getMetricsFromPod retrieves metrics data from an insecure port.
func (g *Grabber) getMetricsFromPod(client clientset.Interface, podName string, namespace string, port int) (string, error) {
rawOutput, err := client.CoreV1().RESTClient().Get().
Namespace(namespace).
Resource("pods").
SubResource("proxy").
Name(fmt.Sprintf("%v:%v", podName, port)).
Name(fmt.Sprintf("%s:%d", podName, port)).
Suffix("metrics").
Do(context.TODO()).Raw()
if err != nil {
return "", err
}
return string(rawOutput), nil
}

// getSecureMetricsFromPod retrieves metrics from a pod that uses TLS
// and checks client credentials. Conceptually this function is
// similar to "kubectl port-forward" + "kubectl get --raw
// https://localhost:<port>/metrics". It uses the same credentials
// as kubelet.
func (g *Grabber) getSecureMetricsFromPod(podName string, namespace string, port int) (string, error) {
dialer := e2epod.NewDialer(g.client, g.config)
metricConfig := rest.CopyConfig(g.config)
addr := e2epod.Addr{
Namespace: namespace,
PodName: podName,
Port: port,
}
metricConfig.Dial = func(ctx context.Context, network, address string) (net.Conn, error) {
return dialer.DialContainerPort(ctx, addr)
}
// This should make it possible verify the server, but while it
// got past the server name check, certificate validation
// still failed.
metricConfig.Host = addr.String()
metricConfig.ServerName = "localhost"
// Verifying the pod certificate with the same root CA
// as for the API server led to an error about "unknown root
// certificate". Disabling certificate checking on the client
// side gets around that and should be good enough for
// E2E testing.
metricConfig.Insecure = true
metricConfig.CAFile = ""
metricConfig.CAData = nil

// clientset.NewForConfig is used because
// metricClient.RESTClient() is directly usable, in contrast
// to the client constructed by rest.RESTClientFor().
metricClient, err := clientset.NewForConfig(metricConfig)
if err != nil {
return "", err
}

rawOutput, err := metricClient.RESTClient().Get().
AbsPath("metrics").
Do(context.TODO()).Raw()
if err != nil {
return "", err
}
return string(rawOutput), nil
}
Loading

0 comments on commit 60d7177

Please sign in to comment.