Skip to content

Commit

Permalink
feat: add metrics collector
Browse files Browse the repository at this point in the history
  • Loading branch information
zyy17 committed Nov 20, 2024
1 parent 76d8f13 commit 7116264
Show file tree
Hide file tree
Showing 3 changed files with 413 additions and 3 deletions.
23 changes: 20 additions & 3 deletions controllers/greptimedbcluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/GreptimeTeam/greptimedb-operator/controllers/constant"
"github.com/GreptimeTeam/greptimedb-operator/controllers/greptimedbcluster/deployers"
"github.com/GreptimeTeam/greptimedb-operator/pkg/deployer"
"github.com/GreptimeTeam/greptimedb-operator/pkg/metrics"
)

const (
Expand All @@ -51,9 +52,10 @@ var (
type Reconciler struct {
client.Client

Scheme *runtime.Scheme
Deployers []deployer.Deployer
Recorder record.EventRecorder
Scheme *runtime.Scheme
Deployers []deployer.Deployer
Recorder record.EventRecorder
MetricsCollector *metrics.MetricsCollector
}

func Setup(mgr ctrl.Manager, _ *options.Options) error {
Expand All @@ -63,6 +65,12 @@ func Setup(mgr ctrl.Manager, _ *options.Options) error {
Recorder: mgr.GetEventRecorderFor("greptimedbcluster-controller"),
}

metricsCollector, err := metrics.NewMetricsCollector()
if err != nil {
return err
}
reconciler.MetricsCollector = metricsCollector

// sync will execute the sync logic of multiple deployers in order.
reconciler.Deployers = []deployer.Deployer{
deployers.NewMonitoringDeployer(mgr),
Expand Down Expand Up @@ -222,6 +230,15 @@ func (r *Reconciler) sync(ctx context.Context, cluster *v1alpha1.GreptimeDBClust
}
}

if cluster.Status.ClusterPhase == v1alpha1.PhaseRunning {
if err := r.MetricsCollector.CollectClusterPodMetrics(ctx, cluster); err != nil {
klog.Errorf("Failed to collect cluster pod metrics: '%v'", err)

// We will not return error here because it is not a critical issue.
return ctrl.Result{}, nil
}
}

return ctrl.Result{}, nil
}

Expand Down
333 changes: 333 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
// Copyright 2024 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"context"
"errors"
"fmt"
"regexp"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/metrics"

greptimev1alpha1 "github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1"
"github.com/GreptimeTeam/greptimedb-operator/controllers/common"
"github.com/GreptimeTeam/greptimedb-operator/controllers/constant"
)

const (
// MetricPrefix is the prefix of all metrics.
MetricPrefix = "greptimedb_operator"
)

var (
podStartupDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: metricName("pod_startup_duration_seconds"),
Help: "The duration from pod startup to all containers are ready.",

// Exponential buckets from 1s to 10min.
Buckets: prometheus.ExponentialBucketsRange(1, 600, 12),
},
[]string{"namespace", "resource", "pod", "node", "role"},
)

podSchedulingDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: metricName("pod_scheduling_duration_seconds"),
Help: "The duration from pod startup to scheduled.",

// Exponential buckets from 1s to 10min.
Buckets: prometheus.ExponentialBucketsRange(1, 600, 12),
},
[]string{"namespace", "resource", "pod", "node", "role"},
)

podInitializingDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: metricName("pod_initializing_duration_seconds"),
Help: "The duration from pod scheduled to initialized.",

// Exponential buckets from 1s to 10min.
Buckets: prometheus.ExponentialBucketsRange(1, 600, 12),
},
[]string{"namespace", "resource", "pod", "node", "role"},
)

podContainerStartupDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: metricName("pod_container_startup_duration_seconds"),
Help: "The duration from pod initialized to container running.",

// Exponential buckets from 1s to 10min.
Buckets: prometheus.ExponentialBucketsRange(1, 600, 12),
},
[]string{"namespace", "resource", "pod", "container", "node", "role"},
)

podImagePullingDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: metricName("pod_image_pulling_duration_milliseconds"),
Help: "The duration for pod image pulling.",

// Exponential buckets from 1s to 10min.
Buckets: prometheus.ExponentialBucketsRange(1, 600, 12),
},
[]string{"namespace", "resource", "pod", "node", "role", "image"},
)
)

var (
ErrEmptyEvents = errors.New("no events found")
)

func init() {
metrics.Registry.MustRegister(podStartupDuration)
metrics.Registry.MustRegister(podSchedulingDuration)
metrics.Registry.MustRegister(podInitializingDuration)
metrics.Registry.MustRegister(podContainerStartupDuration)
metrics.Registry.MustRegister(podImagePullingDuration)
}

// MetricsCollector is used to collect pod metrics.
type MetricsCollector struct {
client client.Client
}

// NewMetricsCollector creates a new MetricsCollector.
func NewMetricsCollector() (*MetricsCollector, error) {
// Create a new K8s client for metrics collection.
cfg, err := config.GetConfig()
if err != nil {
return nil, err
}

c, err := client.New(cfg, client.Options{})
if err != nil {
return nil, err
}

return &MetricsCollector{client: c}, nil
}

// CollectClusterPodMetrics collects pod metrics for a cluster.
func (c *MetricsCollector) CollectClusterPodMetrics(ctx context.Context, cluster *greptimev1alpha1.GreptimeDBCluster) error {
if cluster.GetMeta() != nil {
if err := c.collectPodMetricsByRole(ctx, cluster, greptimev1alpha1.MetaComponentKind); err != nil {
return err
}
}

if cluster.GetDatanode() != nil {
if err := c.collectPodMetricsByRole(ctx, cluster, greptimev1alpha1.DatanodeComponentKind); err != nil {
return err
}
}

if cluster.GetFrontend() != nil {
if err := c.collectPodMetricsByRole(ctx, cluster, greptimev1alpha1.FrontendComponentKind); err != nil {
return err
}
}

if cluster.GetFlownode() != nil {
if err := c.collectPodMetricsByRole(ctx, cluster, greptimev1alpha1.FlownodeComponentKind); err != nil {
return err
}
}

return nil
}

func (c *MetricsCollector) collectPodMetricsByRole(ctx context.Context, cluster *greptimev1alpha1.GreptimeDBCluster, role greptimev1alpha1.ComponentKind) error {
pods, err := c.getPods(ctx, cluster, role)
if err != nil {
return err
}

for _, pod := range pods {
if err := c.collectPodMetrics(ctx, cluster.Name, &pod, role); err != nil {
return err
}
}

return nil
}

func (c *MetricsCollector) collectPodMetrics(ctx context.Context, clusterName string, pod *corev1.Pod, role greptimev1alpha1.ComponentKind) error {
if pod.Status.Conditions == nil {
return nil
}

startTime := pod.GetCreationTimestamp().Time
scheduledTime, err := c.getPodConditionTime(&pod.Status, corev1.PodScheduled)
if err != nil {
return fmt.Errorf("get pod scheduled time: %v", err)
}

initializedTime, err := c.getPodConditionTime(&pod.Status, corev1.PodInitialized)
if err != nil {
return fmt.Errorf("get pod initialized time: %v", err)
}

readyTime, err := c.getPodConditionTime(&pod.Status, corev1.PodReady)
if err != nil {
return fmt.Errorf("get pod ready time: %v", err)
}

// Collect container startup duration.
// The calculation is: pod.Status.ContainerStatuses[*].State.Running.StartedAt - pod.Status.Conditions[corev1.PodInitialized].LastTransitionTime.Time.
for _, container := range pod.Status.ContainerStatuses {
podContainerStartupDuration.WithLabelValues(
pod.Namespace, clusterName, pod.Name, container.Name, pod.Spec.NodeName, string(role),
).Observe(container.State.Running.StartedAt.Time.Sub(*initializedTime).Seconds())
}

// Collect pod startup duration.
// The calculation is: pod.Status.Conditions[corev1.PodReady].LastTransitionTime.Time - pod.GetCreationTimestamp().Time.
podStartupDuration.WithLabelValues(
pod.Namespace, clusterName, pod.Name, pod.Spec.NodeName, string(role),
).Observe(readyTime.Sub(startTime).Seconds())

// Collect pod scheduling duration.
// The calculation is: pod.Status.Conditions[corev1.PodScheduled].LastTransitionTime.Time - pod.GetCreationTimestamp().Time.
podSchedulingDuration.WithLabelValues(
pod.Namespace, clusterName, pod.Name, pod.Spec.NodeName, string(role),
).Observe(scheduledTime.Sub(startTime).Seconds())

// Collect pod initializing duration.
// The calculation is: pod.Status.Conditions[corev1.PodInitialized].LastTransitionTime.Time - pod.Status.Conditions[corev1.PodScheduled].LastTransitionTime.Time.
podInitializingDuration.WithLabelValues(
pod.Namespace, clusterName, pod.Name, pod.Spec.NodeName, string(role),
).Observe(initializedTime.Sub(*scheduledTime).Seconds())

if err := c.collectPodImagePullingDuration(ctx, clusterName, pod, role); err != nil {
return err
}

return nil
}

func (c *MetricsCollector) getPodConditionTime(podStatus *corev1.PodStatus, conditionType corev1.PodConditionType) (*time.Time, error) {
for _, condition := range podStatus.Conditions {
if condition.Type == conditionType {
return &condition.LastTransitionTime.Time, nil
}
}

return nil, fmt.Errorf("condition %s not found", conditionType)
}

func (c *MetricsCollector) getPods(ctx context.Context, cluster *greptimev1alpha1.GreptimeDBCluster, componentKind greptimev1alpha1.ComponentKind) ([]corev1.Pod, error) {
selector := metav1.LabelSelector{
MatchLabels: map[string]string{
constant.GreptimeDBComponentName: common.ResourceName(cluster.Name, componentKind),
},
}

pods := &corev1.PodList{}
if err := c.client.List(ctx, pods, client.InNamespace(cluster.Namespace), client.MatchingLabels(selector.MatchLabels)); err != nil {
return nil, err
}

return pods.Items, nil
}

func (c *MetricsCollector) collectPodImagePullingDuration(ctx context.Context, clusterName string, pod *corev1.Pod, role greptimev1alpha1.ComponentKind) error {
imageName, duration, err := c.getImagePullingDurationFromEvents(ctx, pod)
if errors.Is(err, ErrEmptyEvents) { // Maybe the events are deleted by the garbage collector.
return nil
}

if err != nil {
return fmt.Errorf("get image pulling duration: %v", err)
}

podImagePullingDuration.WithLabelValues(
pod.Namespace, clusterName, pod.Name, pod.Spec.NodeName, string(role), imageName,
).Observe(float64(duration.Milliseconds()))

return nil
}

func (c *MetricsCollector) getImagePullingDurationFromEvents(ctx context.Context, pod *corev1.Pod) (string, time.Duration, error) {
const (
prompt = "Successfully pulled image"
)

events := &corev1.EventList{}
if err := c.client.List(ctx, events,
client.InNamespace(pod.Namespace), client.MatchingFields{"involvedObject.uid": string(pod.UID)},
); err != nil && !k8serrors.IsNotFound(err) {
return "", 0, err
}

if len(events.Items) == 0 {
return "", 0, ErrEmptyEvents
}

for _, event := range events.Items {
if strings.Contains(event.Message, prompt) {
imageName, err := parseImageName(event.Message)
if err != nil {
return "", 0, err
}

duration, err := parseImagePullingDuration(event.Message)
if err != nil {
return "", 0, err
}

return imageName, duration, nil
}
}

return "", 0, nil
}

func parseImagePullingDuration(message string) (time.Duration, error) {
// Parse the message to get the image pulling duration.
// The message format is like: 'Successfully pulled image "greptime/greptimedb:latest" in 314.950966ms (6.159733714s including waiting)'.
// The regex is to capture the duration that generated by `time.Since()`.
re := regexp.MustCompile(`in (\d+(?:\.\d+)?(?:h|ms|µs|ns|m|s))`)
matches := re.FindStringSubmatch(message)
if len(matches) == 0 {
return 0, fmt.Errorf("parse image pulling duration from message: %s", message)
}

duration, err := time.ParseDuration(matches[1])
if err != nil {
return 0, fmt.Errorf("parse duration %q: %v", matches[1], err)
}

return duration, nil
}

func parseImageName(message string) (string, error) {
re := regexp.MustCompile(`"([^"]+)"`)
matches := re.FindStringSubmatch(message)
if len(matches) == 0 {
return "", fmt.Errorf("failed to parse image name from message: %s", message)
}

return matches[1], nil
}

func metricName(name string) string {
return fmt.Sprintf("%s_%s", MetricPrefix, name)
}
Loading

0 comments on commit 7116264

Please sign in to comment.