diff --git a/.chloggen/switchk8shpa.yaml b/.chloggen/switchk8shpa.yaml new file mode 100755 index 000000000000..0ed26e6778ff --- /dev/null +++ b/.chloggen/switchk8shpa.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sclusterreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Switch k8s.hpa metrics to use pdata. + +# One or more tracking issues related to the change +issues: [18250] diff --git a/receiver/k8sclusterreceiver/internal/collection/collector.go b/receiver/k8sclusterreceiver/internal/collection/collector.go index c0dcf2a1d004..772787236f06 100644 --- a/receiver/k8sclusterreceiver/internal/collection/collector.go +++ b/receiver/k8sclusterreceiver/internal/collection/collector.go @@ -21,6 +21,7 @@ import ( agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" quotav1 "github.com/openshift/api/quota/v1" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" @@ -58,7 +59,7 @@ import ( // an interface to interact with refactored code from SignalFx Agent which is // confined to the collection package. type DataCollector struct { - logger *zap.Logger + settings receiver.CreateSettings metricsStore *metricsStore metadataStore *metadata.Store nodeConditionsToReport []string @@ -66,9 +67,9 @@ type DataCollector struct { } // NewDataCollector returns a DataCollector. -func NewDataCollector(logger *zap.Logger, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector { +func NewDataCollector(set receiver.CreateSettings, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector { return &DataCollector{ - logger: logger, + settings: set, metricsStore: &metricsStore{ metricsCache: make(map[types.UID]pmetric.Metrics), }, @@ -85,7 +86,7 @@ func (dc *DataCollector) SetupMetadataStore(gvk schema.GroupVersionKind, store c func (dc *DataCollector) RemoveFromMetricsStore(obj interface{}) { if err := dc.metricsStore.remove(obj.(runtime.Object)); err != nil { - dc.logger.Error( + dc.settings.TelemetrySettings.Logger.Error( "failed to remove from metric cache", zap.String("obj", reflect.TypeOf(obj).String()), zap.Error(err), @@ -95,7 +96,7 @@ func (dc *DataCollector) RemoveFromMetricsStore(obj interface{}) { func (dc *DataCollector) UpdateMetricsStore(obj interface{}, md pmetric.Metrics) { if err := dc.metricsStore.update(obj.(runtime.Object), md); err != nil { - dc.logger.Error( + dc.settings.TelemetrySettings.Logger.Error( "failed to update metric cache", zap.String("obj", reflect.TypeOf(obj).String()), zap.Error(err), @@ -113,9 +114,9 @@ func (dc *DataCollector) SyncMetrics(obj interface{}) { switch o := obj.(type) { case *corev1.Pod: - md = ocsToMetrics(pod.GetMetrics(o, dc.logger)) + md = ocsToMetrics(pod.GetMetrics(o, dc.settings.TelemetrySettings.Logger)) case *corev1.Node: - md = ocsToMetrics(node.GetMetrics(o, dc.nodeConditionsToReport, dc.allocatableTypesToReport, dc.logger)) + md = ocsToMetrics(node.GetMetrics(o, dc.nodeConditionsToReport, dc.allocatableTypesToReport, dc.settings.TelemetrySettings.Logger)) case *corev1.Namespace: md = ocsToMetrics(namespace.GetMetrics(o)) case *corev1.ReplicationController: @@ -137,7 +138,7 @@ func (dc *DataCollector) SyncMetrics(obj interface{}) { case *batchv1beta1.CronJob: md = ocsToMetrics(cronjob.GetMetricsBeta(o)) case *autoscalingv2beta2.HorizontalPodAutoscaler: - md = ocsToMetrics(hpa.GetMetrics(o)) + md = hpa.GetMetrics(dc.settings, o) case *quotav1.ClusterResourceQuota: md = ocsToMetrics(clusterresourcequota.GetMetrics(o)) default: @@ -156,7 +157,7 @@ func (dc *DataCollector) SyncMetadata(obj interface{}) map[experimentalmetricmet km := map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{} switch o := obj.(type) { case *corev1.Pod: - km = pod.GetMetadata(o, dc.metadataStore, dc.logger) + km = pod.GetMetadata(o, dc.metadataStore, dc.settings.TelemetrySettings.Logger) case *corev1.Node: km = node.GetMetadata(o) case *corev1.ReplicationController: diff --git a/receiver/k8sclusterreceiver/internal/collection/collector_test.go b/receiver/k8sclusterreceiver/internal/collection/collector_test.go index 8f72ae458dbd..6a0fedb98be5 100644 --- a/receiver/k8sclusterreceiver/internal/collection/collector_test.go +++ b/receiver/k8sclusterreceiver/internal/collection/collector_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -274,10 +275,11 @@ func TestDataCollectorSyncMetadata(t *testing.T) { for _, tt := range tests { observedLogger, _ := observer.New(zapcore.WarnLevel) - logger := zap.New(observedLogger) + set := receivertest.NewNopCreateSettings() + set.TelemetrySettings.Logger = zap.New(observedLogger) t.Run(tt.name, func(t *testing.T) { dc := &DataCollector{ - logger: logger, + settings: set, metadataStore: tt.metadataStore, nodeConditionsToReport: []string{}, } diff --git a/receiver/k8sclusterreceiver/internal/constants/constants.go b/receiver/k8sclusterreceiver/internal/constants/constants.go index f0e4062a4528..2d3efc853a23 100644 --- a/receiver/k8sclusterreceiver/internal/constants/constants.go +++ b/receiver/k8sclusterreceiver/internal/constants/constants.go @@ -23,13 +23,11 @@ const ( // Resource labels keys for UID. K8sKeyNamespaceUID = "k8s.namespace.uid" K8sKeyReplicationControllerUID = "k8s.replicationcontroller.uid" - K8sKeyHPAUID = "k8s.hpa.uid" K8sKeyResourceQuotaUID = "k8s.resourcequota.uid" K8sKeyClusterResourceQuotaUID = "openshift.clusterquota.uid" // Resource labels keys for Name. K8sKeyReplicationControllerName = "k8s.replicationcontroller.name" - K8sKeyHPAName = "k8s.hpa.name" K8sKeyResourceQuotaName = "k8s.resourcequota.name" K8sKeyClusterResourceQuotaName = "openshift.clusterquota.name" diff --git a/receiver/k8sclusterreceiver/internal/hpa/doc.go b/receiver/k8sclusterreceiver/internal/hpa/doc.go new file mode 100644 index 000000000000..054677a1e950 --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/hpa/doc.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !windows +// +build !windows + +//go:generate mdatagen metadata.yaml + +package hpa // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa" diff --git a/receiver/k8sclusterreceiver/internal/hpa/documentation.md b/receiver/k8sclusterreceiver/internal/hpa/documentation.md new file mode 100644 index 000000000000..5a7f46104a6c --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/hpa/documentation.md @@ -0,0 +1,53 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# k8s/hpa + +## Default Metrics + +The following metrics are emitted by default. Each of them can be disabled by applying the following configuration: + +```yaml +metrics: + : + enabled: false +``` + +### k8s.hpa.current_replicas + +Current number of pod replicas managed by this autoscaler. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### k8s.hpa.desired_replicas + +Desired number of pod replicas managed by this autoscaler. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### k8s.hpa.max_replicas + +Maximum number of replicas to which the autoscaler can scale up. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### k8s.hpa.min_replicas + +Minimum number of replicas to which the autoscaler can scale up. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +## Resource Attributes + +| Name | Description | Values | Enabled | +| ---- | ----------- | ------ | ------- | +| k8s.hpa.name | The k8s hpa name. | Any Str | true | +| k8s.hpa.uid | The k8s hpa uid. | Any Str | true | +| k8s.namespace.name | The name of the namespace that the pod is running in. | Any Str | true | diff --git a/receiver/k8sclusterreceiver/internal/hpa/hpa.go b/receiver/k8sclusterreceiver/internal/hpa/hpa.go index 062aea0cd98b..38c8ade84806 100644 --- a/receiver/k8sclusterreceiver/internal/hpa/hpa.go +++ b/receiver/k8sclusterreceiver/internal/hpa/hpa.go @@ -15,91 +15,27 @@ package hpa // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa" import ( - agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" - resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" - conventions "go.opentelemetry.io/collector/semconv/v1.6.1" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" + imetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" ) -var hpaMaxReplicasMetric = &metricspb.MetricDescriptor{ - Name: "k8s.hpa.max_replicas", - Description: "Maximum number of replicas to which the autoscaler can scale up", - Unit: "1", - Type: metricspb.MetricDescriptor_GAUGE_INT64, -} - -var hpaMinReplicasMetric = &metricspb.MetricDescriptor{ - Name: "k8s.hpa.min_replicas", - Description: "Minimum number of replicas to which the autoscaler can scale down", - Unit: "1", - Type: metricspb.MetricDescriptor_GAUGE_INT64, -} - -var hpaCurrentReplicasMetric = &metricspb.MetricDescriptor{ - Name: "k8s.hpa.current_replicas", - Description: "Current number of pod replicas managed by this autoscaler", - Unit: "1", - Type: metricspb.MetricDescriptor_GAUGE_INT64, -} - -var hpaDesiredReplicasMetric = &metricspb.MetricDescriptor{ - Name: "k8s.hpa.desired_replicas", - Description: "Desired number of pod replicas managed by this autoscaler", - Unit: "1", - Type: metricspb.MetricDescriptor_GAUGE_INT64, -} - -func GetMetrics(hpa *autoscalingv2beta2.HorizontalPodAutoscaler) []*agentmetricspb.ExportMetricsServiceRequest { - metrics := []*metricspb.Metric{ - { - MetricDescriptor: hpaMaxReplicasMetric, - Timeseries: []*metricspb.TimeSeries{ - utils.GetInt64TimeSeries(int64(hpa.Spec.MaxReplicas)), - }, - }, - { - MetricDescriptor: hpaMinReplicasMetric, - Timeseries: []*metricspb.TimeSeries{ - utils.GetInt64TimeSeries(int64(*hpa.Spec.MinReplicas)), - }, - }, - { - MetricDescriptor: hpaCurrentReplicasMetric, - Timeseries: []*metricspb.TimeSeries{ - utils.GetInt64TimeSeries(int64(hpa.Status.CurrentReplicas)), - }, - }, - { - MetricDescriptor: hpaDesiredReplicasMetric, - Timeseries: []*metricspb.TimeSeries{ - utils.GetInt64TimeSeries(int64(hpa.Status.DesiredReplicas)), - }, - }, - } +func GetMetrics(set receiver.CreateSettings, hpa *autoscalingv2beta2.HorizontalPodAutoscaler) pmetric.Metrics { + mb := imetadata.NewMetricsBuilder(imetadata.DefaultMetricsSettings(), set) - return []*agentmetricspb.ExportMetricsServiceRequest{ - { - Resource: getResourceForHPA(hpa), - Metrics: metrics, - }, - } -} - -func getResourceForHPA(hpa *autoscalingv2beta2.HorizontalPodAutoscaler) *resourcepb.Resource { - return &resourcepb.Resource{ - Type: constants.K8sType, - Labels: map[string]string{ - constants.K8sKeyHPAUID: string(hpa.UID), - constants.K8sKeyHPAName: hpa.Name, - conventions.AttributeK8SNamespaceName: hpa.Namespace, - }, - } + ts := pcommon.NewTimestampFromTime(time.Now()) + mb.RecordK8sHpaMaxReplicasDataPoint(ts, int64(hpa.Spec.MaxReplicas)) + mb.RecordK8sHpaMinReplicasDataPoint(ts, int64(*hpa.Spec.MinReplicas)) + mb.RecordK8sHpaCurrentReplicasDataPoint(ts, int64(hpa.Status.CurrentReplicas)) + mb.RecordK8sHpaDesiredReplicasDataPoint(ts, int64(hpa.Status.DesiredReplicas)) + return mb.Emit(imetadata.WithK8sHpaUID(string(hpa.UID)), imetadata.WithK8sHpaName(hpa.Name), imetadata.WithK8sNamespaceName(hpa.Namespace)) } func GetMetadata(hpa *autoscalingv2beta2.HorizontalPodAutoscaler) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go b/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go index d005f48d13bf..abcdd5397f91 100644 --- a/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go +++ b/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go @@ -17,39 +17,37 @@ package hpa import ( "testing" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver/receivertest" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) func TestHPAMetrics(t *testing.T) { hpa := testutils.NewHPA("1") - actualResourceMetrics := GetMetrics(hpa) + md := GetMetrics(receivertest.NewNopCreateSettings(), hpa) - require.Equal(t, 1, len(actualResourceMetrics)) - require.Equal(t, 4, len(actualResourceMetrics[0].Metrics)) - - rm := actualResourceMetrics[0] - testutils.AssertResource(t, rm.Resource, constants.K8sType, - map[string]string{ + require.Equal(t, 1, md.ResourceMetrics().Len()) + rm := md.ResourceMetrics().At(0) + assert.Equal(t, + map[string]any{ "k8s.hpa.uid": "test-hpa-1-uid", "k8s.hpa.name": "test-hpa-1", "k8s.namespace.name": "test-namespace", }, - ) - - testutils.AssertMetricsInt(t, rm.Metrics[0], "k8s.hpa.max_replicas", - metricspb.MetricDescriptor_GAUGE_INT64, 10) - - testutils.AssertMetricsInt(t, rm.Metrics[1], "k8s.hpa.min_replicas", - metricspb.MetricDescriptor_GAUGE_INT64, 2) - - testutils.AssertMetricsInt(t, rm.Metrics[2], "k8s.hpa.current_replicas", - metricspb.MetricDescriptor_GAUGE_INT64, 5) - - testutils.AssertMetricsInt(t, rm.Metrics[3], "k8s.hpa.desired_replicas", - metricspb.MetricDescriptor_GAUGE_INT64, 7) + rm.Resource().Attributes().AsRaw()) + + require.Equal(t, 1, rm.ScopeMetrics().Len()) + sms := rm.ScopeMetrics().At(0) + require.Equal(t, 4, sms.Metrics().Len()) + sms.Metrics().Sort(func(a, b pmetric.Metric) bool { + return a.Name() < b.Name() + }) + testutils.AssertMetricInt(t, sms.Metrics().At(0), "k8s.hpa.current_replicas", pmetric.MetricTypeGauge, 5) + testutils.AssertMetricInt(t, sms.Metrics().At(1), "k8s.hpa.desired_replicas", pmetric.MetricTypeGauge, 7) + testutils.AssertMetricInt(t, sms.Metrics().At(2), "k8s.hpa.max_replicas", pmetric.MetricTypeGauge, 10) + testutils.AssertMetricInt(t, sms.Metrics().At(3), "k8s.hpa.min_replicas", pmetric.MetricTypeGauge, 2) } diff --git a/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_metrics.go b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_metrics.go new file mode 100644 index 000000000000..36bd46621c6e --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_metrics.go @@ -0,0 +1,469 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" + conventions "go.opentelemetry.io/collector/semconv/v1.9.0" +) + +// MetricSettings provides common settings for a particular metric. +type MetricSettings struct { + Enabled bool `mapstructure:"enabled"` + + enabledSetByUser bool +} + +func (ms *MetricSettings) Unmarshal(parser *confmap.Conf) error { + if parser == nil { + return nil + } + err := parser.Unmarshal(ms, confmap.WithErrorUnused()) + if err != nil { + return err + } + ms.enabledSetByUser = parser.IsSet("enabled") + return nil +} + +// MetricsSettings provides settings for k8s/hpa metrics. +type MetricsSettings struct { + K8sHpaCurrentReplicas MetricSettings `mapstructure:"k8s.hpa.current_replicas"` + K8sHpaDesiredReplicas MetricSettings `mapstructure:"k8s.hpa.desired_replicas"` + K8sHpaMaxReplicas MetricSettings `mapstructure:"k8s.hpa.max_replicas"` + K8sHpaMinReplicas MetricSettings `mapstructure:"k8s.hpa.min_replicas"` +} + +func DefaultMetricsSettings() MetricsSettings { + return MetricsSettings{ + K8sHpaCurrentReplicas: MetricSettings{ + Enabled: true, + }, + K8sHpaDesiredReplicas: MetricSettings{ + Enabled: true, + }, + K8sHpaMaxReplicas: MetricSettings{ + Enabled: true, + }, + K8sHpaMinReplicas: MetricSettings{ + Enabled: true, + }, + } +} + +// ResourceAttributeSettings provides common settings for a particular metric. +type ResourceAttributeSettings struct { + Enabled bool `mapstructure:"enabled"` + + enabledProvidedByUser bool +} + +func (ras *ResourceAttributeSettings) Unmarshal(parser *confmap.Conf) error { + if parser == nil { + return nil + } + err := parser.Unmarshal(ras, confmap.WithErrorUnused()) + if err != nil { + return err + } + ras.enabledProvidedByUser = parser.IsSet("enabled") + return nil +} + +// ResourceAttributesSettings provides settings for k8s/hpa metrics. +type ResourceAttributesSettings struct { + K8sHpaName ResourceAttributeSettings `mapstructure:"k8s.hpa.name"` + K8sHpaUID ResourceAttributeSettings `mapstructure:"k8s.hpa.uid"` + K8sNamespaceName ResourceAttributeSettings `mapstructure:"k8s.namespace.name"` +} + +func DefaultResourceAttributesSettings() ResourceAttributesSettings { + return ResourceAttributesSettings{ + K8sHpaName: ResourceAttributeSettings{ + Enabled: true, + }, + K8sHpaUID: ResourceAttributeSettings{ + Enabled: true, + }, + K8sNamespaceName: ResourceAttributeSettings{ + Enabled: true, + }, + } +} + +type metricK8sHpaCurrentReplicas struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.hpa.current_replicas metric with initial data. +func (m *metricK8sHpaCurrentReplicas) init() { + m.data.SetName("k8s.hpa.current_replicas") + m.data.SetDescription("Current number of pod replicas managed by this autoscaler.") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricK8sHpaCurrentReplicas) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.settings.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sHpaCurrentReplicas) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sHpaCurrentReplicas) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sHpaCurrentReplicas(settings MetricSettings) metricK8sHpaCurrentReplicas { + m := metricK8sHpaCurrentReplicas{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sHpaDesiredReplicas struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.hpa.desired_replicas metric with initial data. +func (m *metricK8sHpaDesiredReplicas) init() { + m.data.SetName("k8s.hpa.desired_replicas") + m.data.SetDescription("Desired number of pod replicas managed by this autoscaler.") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricK8sHpaDesiredReplicas) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.settings.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sHpaDesiredReplicas) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sHpaDesiredReplicas) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sHpaDesiredReplicas(settings MetricSettings) metricK8sHpaDesiredReplicas { + m := metricK8sHpaDesiredReplicas{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sHpaMaxReplicas struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.hpa.max_replicas metric with initial data. +func (m *metricK8sHpaMaxReplicas) init() { + m.data.SetName("k8s.hpa.max_replicas") + m.data.SetDescription("Maximum number of replicas to which the autoscaler can scale up.") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricK8sHpaMaxReplicas) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.settings.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sHpaMaxReplicas) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sHpaMaxReplicas) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sHpaMaxReplicas(settings MetricSettings) metricK8sHpaMaxReplicas { + m := metricK8sHpaMaxReplicas{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sHpaMinReplicas struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.hpa.min_replicas metric with initial data. +func (m *metricK8sHpaMinReplicas) init() { + m.data.SetName("k8s.hpa.min_replicas") + m.data.SetDescription("Minimum number of replicas to which the autoscaler can scale up.") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricK8sHpaMinReplicas) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.settings.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sHpaMinReplicas) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sHpaMinReplicas) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sHpaMinReplicas(settings MetricSettings) metricK8sHpaMinReplicas { + m := metricK8sHpaMinReplicas{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +// MetricsBuilder provides an interface for scrapers to report metrics while taking care of all the transformations +// required to produce metric representation defined in metadata and user settings. +type MetricsBuilder struct { + startTime pcommon.Timestamp // start time that will be applied to all recorded data points. + metricsCapacity int // maximum observed number of metrics per resource. + resourceCapacity int // maximum observed number of resource attributes. + metricsBuffer pmetric.Metrics // accumulates metrics data before emitting. + buildInfo component.BuildInfo // contains version information + resourceAttributesSettings ResourceAttributesSettings + metricK8sHpaCurrentReplicas metricK8sHpaCurrentReplicas + metricK8sHpaDesiredReplicas metricK8sHpaDesiredReplicas + metricK8sHpaMaxReplicas metricK8sHpaMaxReplicas + metricK8sHpaMinReplicas metricK8sHpaMinReplicas +} + +// metricBuilderOption applies changes to default metrics builder. +type metricBuilderOption func(*MetricsBuilder) + +// WithStartTime sets startTime on the metrics builder. +func WithStartTime(startTime pcommon.Timestamp) metricBuilderOption { + return func(mb *MetricsBuilder) { + mb.startTime = startTime + } +} + +// WithResourceAttributesSettings sets ResourceAttributeSettings on the metrics builder. +func WithResourceAttributesSettings(ras ResourceAttributesSettings) metricBuilderOption { + return func(mb *MetricsBuilder) { + mb.resourceAttributesSettings = ras + } +} + +func NewMetricsBuilder(ms MetricsSettings, settings receiver.CreateSettings, options ...metricBuilderOption) *MetricsBuilder { + mb := &MetricsBuilder{ + startTime: pcommon.NewTimestampFromTime(time.Now()), + metricsBuffer: pmetric.NewMetrics(), + buildInfo: settings.BuildInfo, + resourceAttributesSettings: DefaultResourceAttributesSettings(), + metricK8sHpaCurrentReplicas: newMetricK8sHpaCurrentReplicas(ms.K8sHpaCurrentReplicas), + metricK8sHpaDesiredReplicas: newMetricK8sHpaDesiredReplicas(ms.K8sHpaDesiredReplicas), + metricK8sHpaMaxReplicas: newMetricK8sHpaMaxReplicas(ms.K8sHpaMaxReplicas), + metricK8sHpaMinReplicas: newMetricK8sHpaMinReplicas(ms.K8sHpaMinReplicas), + } + for _, op := range options { + op(mb) + } + return mb +} + +// updateCapacity updates max length of metrics and resource attributes that will be used for the slice capacity. +func (mb *MetricsBuilder) updateCapacity(rm pmetric.ResourceMetrics) { + if mb.metricsCapacity < rm.ScopeMetrics().At(0).Metrics().Len() { + mb.metricsCapacity = rm.ScopeMetrics().At(0).Metrics().Len() + } + if mb.resourceCapacity < rm.Resource().Attributes().Len() { + mb.resourceCapacity = rm.Resource().Attributes().Len() + } +} + +// ResourceMetricsOption applies changes to provided resource metrics. +type ResourceMetricsOption func(ResourceAttributesSettings, pmetric.ResourceMetrics) + +// WithK8sHpaName sets provided value as "k8s.hpa.name" attribute for current resource. +func WithK8sHpaName(val string) ResourceMetricsOption { + return func(ras ResourceAttributesSettings, rm pmetric.ResourceMetrics) { + if ras.K8sHpaName.Enabled { + rm.Resource().Attributes().PutStr("k8s.hpa.name", val) + } + } +} + +// WithK8sHpaUID sets provided value as "k8s.hpa.uid" attribute for current resource. +func WithK8sHpaUID(val string) ResourceMetricsOption { + return func(ras ResourceAttributesSettings, rm pmetric.ResourceMetrics) { + if ras.K8sHpaUID.Enabled { + rm.Resource().Attributes().PutStr("k8s.hpa.uid", val) + } + } +} + +// WithK8sNamespaceName sets provided value as "k8s.namespace.name" attribute for current resource. +func WithK8sNamespaceName(val string) ResourceMetricsOption { + return func(ras ResourceAttributesSettings, rm pmetric.ResourceMetrics) { + if ras.K8sNamespaceName.Enabled { + rm.Resource().Attributes().PutStr("k8s.namespace.name", val) + } + } +} + +// WithStartTimeOverride overrides start time for all the resource metrics data points. +// This option should be only used if different start time has to be set on metrics coming from different resources. +func WithStartTimeOverride(start pcommon.Timestamp) ResourceMetricsOption { + return func(ras ResourceAttributesSettings, rm pmetric.ResourceMetrics) { + var dps pmetric.NumberDataPointSlice + metrics := rm.ScopeMetrics().At(0).Metrics() + for i := 0; i < metrics.Len(); i++ { + switch metrics.At(i).Type() { + case pmetric.MetricTypeGauge: + dps = metrics.At(i).Gauge().DataPoints() + case pmetric.MetricTypeSum: + dps = metrics.At(i).Sum().DataPoints() + } + for j := 0; j < dps.Len(); j++ { + dps.At(j).SetStartTimestamp(start) + } + } + } +} + +// EmitForResource saves all the generated metrics under a new resource and updates the internal state to be ready for +// recording another set of data points as part of another resource. This function can be helpful when one scraper +// needs to emit metrics from several resources. Otherwise calling this function is not required, +// just `Emit` function can be called instead. +// Resource attributes should be provided as ResourceMetricsOption arguments. +func (mb *MetricsBuilder) EmitForResource(rmo ...ResourceMetricsOption) { + rm := pmetric.NewResourceMetrics() + rm.SetSchemaUrl(conventions.SchemaURL) + rm.Resource().Attributes().EnsureCapacity(mb.resourceCapacity) + ils := rm.ScopeMetrics().AppendEmpty() + ils.Scope().SetName("otelcol/k8s/hpa") + ils.Scope().SetVersion(mb.buildInfo.Version) + ils.Metrics().EnsureCapacity(mb.metricsCapacity) + mb.metricK8sHpaCurrentReplicas.emit(ils.Metrics()) + mb.metricK8sHpaDesiredReplicas.emit(ils.Metrics()) + mb.metricK8sHpaMaxReplicas.emit(ils.Metrics()) + mb.metricK8sHpaMinReplicas.emit(ils.Metrics()) + + for _, op := range rmo { + op(mb.resourceAttributesSettings, rm) + } + if ils.Metrics().Len() > 0 { + mb.updateCapacity(rm) + rm.MoveTo(mb.metricsBuffer.ResourceMetrics().AppendEmpty()) + } +} + +// Emit returns all the metrics accumulated by the metrics builder and updates the internal state to be ready for +// recording another set of metrics. This function will be responsible for applying all the transformations required to +// produce metric representation defined in metadata and user settings, e.g. delta or cumulative. +func (mb *MetricsBuilder) Emit(rmo ...ResourceMetricsOption) pmetric.Metrics { + mb.EmitForResource(rmo...) + metrics := pmetric.NewMetrics() + mb.metricsBuffer.MoveTo(metrics) + return metrics +} + +// RecordK8sHpaCurrentReplicasDataPoint adds a data point to k8s.hpa.current_replicas metric. +func (mb *MetricsBuilder) RecordK8sHpaCurrentReplicasDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sHpaCurrentReplicas.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sHpaDesiredReplicasDataPoint adds a data point to k8s.hpa.desired_replicas metric. +func (mb *MetricsBuilder) RecordK8sHpaDesiredReplicasDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sHpaDesiredReplicas.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sHpaMaxReplicasDataPoint adds a data point to k8s.hpa.max_replicas metric. +func (mb *MetricsBuilder) RecordK8sHpaMaxReplicasDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sHpaMaxReplicas.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sHpaMinReplicasDataPoint adds a data point to k8s.hpa.min_replicas metric. +func (mb *MetricsBuilder) RecordK8sHpaMinReplicasDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sHpaMinReplicas.recordDataPoint(mb.startTime, ts, val) +} + +// Reset resets metrics builder to its initial state. It should be used when external metrics source is restarted, +// and metrics builder should update its startTime and reset it's internal state accordingly. +func (mb *MetricsBuilder) Reset(options ...metricBuilderOption) { + mb.startTime = pcommon.NewTimestampFromTime(time.Now()) + for _, op := range options { + op(mb) + } +} diff --git a/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_metrics_test.go b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_metrics_test.go new file mode 100644 index 000000000000..5f953eeeda29 --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_metrics_test.go @@ -0,0 +1,185 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +type testMetricsSet int + +const ( + testMetricsSetDefault testMetricsSet = iota + testMetricsSetAll + testMetricsSetNo +) + +func TestMetricsBuilder(t *testing.T) { + tests := []struct { + name string + metricsSet testMetricsSet + }{ + { + name: "default", + metricsSet: testMetricsSetDefault, + }, + { + name: "all_metrics", + metricsSet: testMetricsSetAll, + }, + { + name: "no_metrics", + metricsSet: testMetricsSetNo, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + start := pcommon.Timestamp(1_000_000_000) + ts := pcommon.Timestamp(1_000_001_000) + observedZapCore, observedLogs := observer.New(zap.WarnLevel) + settings := receivertest.NewNopCreateSettings() + settings.Logger = zap.New(observedZapCore) + mb := NewMetricsBuilder(loadConfig(t, test.name), settings, WithStartTime(start)) + + expectedWarnings := 0 + assert.Equal(t, expectedWarnings, observedLogs.Len()) + + defaultMetricsCount := 0 + allMetricsCount := 0 + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sHpaCurrentReplicasDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sHpaDesiredReplicasDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sHpaMaxReplicasDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sHpaMinReplicasDataPoint(ts, 1) + + metrics := mb.Emit(WithK8sHpaName("attr-val"), WithK8sHpaUID("attr-val"), WithK8sNamespaceName("attr-val")) + + if test.metricsSet == testMetricsSetNo { + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) + return + } + + assert.Equal(t, 1, metrics.ResourceMetrics().Len()) + rm := metrics.ResourceMetrics().At(0) + attrCount := 0 + enabledAttrCount := 0 + attrVal, ok := rm.Resource().Attributes().Get("k8s.hpa.name") + attrCount++ + assert.Equal(t, mb.resourceAttributesSettings.K8sHpaName.Enabled, ok) + if mb.resourceAttributesSettings.K8sHpaName.Enabled { + enabledAttrCount++ + assert.EqualValues(t, "attr-val", attrVal.Str()) + } + attrVal, ok = rm.Resource().Attributes().Get("k8s.hpa.uid") + attrCount++ + assert.Equal(t, mb.resourceAttributesSettings.K8sHpaUID.Enabled, ok) + if mb.resourceAttributesSettings.K8sHpaUID.Enabled { + enabledAttrCount++ + assert.EqualValues(t, "attr-val", attrVal.Str()) + } + attrVal, ok = rm.Resource().Attributes().Get("k8s.namespace.name") + attrCount++ + assert.Equal(t, mb.resourceAttributesSettings.K8sNamespaceName.Enabled, ok) + if mb.resourceAttributesSettings.K8sNamespaceName.Enabled { + enabledAttrCount++ + assert.EqualValues(t, "attr-val", attrVal.Str()) + } + assert.Equal(t, enabledAttrCount, rm.Resource().Attributes().Len()) + assert.Equal(t, attrCount, 3) + + assert.Equal(t, 1, rm.ScopeMetrics().Len()) + ms := rm.ScopeMetrics().At(0).Metrics() + if test.metricsSet == testMetricsSetDefault { + assert.Equal(t, defaultMetricsCount, ms.Len()) + } + if test.metricsSet == testMetricsSetAll { + assert.Equal(t, allMetricsCount, ms.Len()) + } + validatedMetrics := make(map[string]bool) + for i := 0; i < ms.Len(); i++ { + switch ms.At(i).Name() { + case "k8s.hpa.current_replicas": + assert.False(t, validatedMetrics["k8s.hpa.current_replicas"], "Found a duplicate in the metrics slice: k8s.hpa.current_replicas") + validatedMetrics["k8s.hpa.current_replicas"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "Current number of pod replicas managed by this autoscaler.", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.hpa.desired_replicas": + assert.False(t, validatedMetrics["k8s.hpa.desired_replicas"], "Found a duplicate in the metrics slice: k8s.hpa.desired_replicas") + validatedMetrics["k8s.hpa.desired_replicas"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "Desired number of pod replicas managed by this autoscaler.", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.hpa.max_replicas": + assert.False(t, validatedMetrics["k8s.hpa.max_replicas"], "Found a duplicate in the metrics slice: k8s.hpa.max_replicas") + validatedMetrics["k8s.hpa.max_replicas"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "Maximum number of replicas to which the autoscaler can scale up.", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.hpa.min_replicas": + assert.False(t, validatedMetrics["k8s.hpa.min_replicas"], "Found a duplicate in the metrics slice: k8s.hpa.min_replicas") + validatedMetrics["k8s.hpa.min_replicas"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "Minimum number of replicas to which the autoscaler can scale up.", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + } + } + }) + } +} + +func loadConfig(t *testing.T, name string) MetricsSettings { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + sub, err := cm.Sub(name) + require.NoError(t, err) + cfg := DefaultMetricsSettings() + require.NoError(t, component.UnmarshalConfig(sub, &cfg)) + return cfg +} diff --git a/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/testdata/config.yaml b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/testdata/config.yaml new file mode 100644 index 000000000000..2c649711f33c --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/testdata/config.yaml @@ -0,0 +1,19 @@ +default: +all_metrics: + k8s.hpa.current_replicas: + enabled: true + k8s.hpa.desired_replicas: + enabled: true + k8s.hpa.max_replicas: + enabled: true + k8s.hpa.min_replicas: + enabled: true +no_metrics: + k8s.hpa.current_replicas: + enabled: false + k8s.hpa.desired_replicas: + enabled: false + k8s.hpa.max_replicas: + enabled: false + k8s.hpa.min_replicas: + enabled: false diff --git a/receiver/k8sclusterreceiver/internal/hpa/metadata.yaml b/receiver/k8sclusterreceiver/internal/hpa/metadata.yaml new file mode 100644 index 000000000000..893414ceceb5 --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/hpa/metadata.yaml @@ -0,0 +1,48 @@ +name: k8s/hpa + +sem_conv_version: 1.9.0 + +resource_attributes: + k8s.hpa.uid: + description: The k8s hpa uid. + type: string + enabled: true + + k8s.hpa.name: + description: The k8s hpa name. + type: string + enabled: true + + k8s.namespace.name: + description: The name of the namespace that the pod is running in. + type: string + enabled: true + +metrics: + k8s.hpa.max_replicas: + enabled: true + description: Maximum number of replicas to which the autoscaler can scale up. + unit: 1 + gauge: + value_type: int + + k8s.hpa.min_replicas: + enabled: true + description: Minimum number of replicas to which the autoscaler can scale up. + unit: 1 + gauge: + value_type: int + + k8s.hpa.current_replicas: + enabled: true + description: Current number of pod replicas managed by this autoscaler. + unit: 1 + gauge: + value_type: int + + k8s.hpa.desired_replicas: + enabled: true + description: Desired number of pod replicas managed by this autoscaler. + unit: 1 + gauge: + value_type: int diff --git a/receiver/k8sclusterreceiver/internal/testutils/metrics.go b/receiver/k8sclusterreceiver/internal/testutils/metrics.go index 35e22cefa6d8..94b941d8b7e4 100644 --- a/receiver/k8sclusterreceiver/internal/testutils/metrics.go +++ b/receiver/k8sclusterreceiver/internal/testutils/metrics.go @@ -20,6 +20,7 @@ import ( metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pmetric" ) func AssertResource(t *testing.T, actualResource *resourcepb.Resource, @@ -37,6 +38,26 @@ func AssertResource(t *testing.T, actualResource *resourcepb.Resource, ) } +func AssertMetricInt(t testing.TB, m pmetric.Metric, expectedMetric string, expectedType pmetric.MetricType, expectedValue any) { + dps := assertMetric(t, m, expectedMetric, expectedType) + require.EqualValues(t, expectedValue, dps.At(0).IntValue(), "mismatching metric values") +} + +func assertMetric(t testing.TB, m pmetric.Metric, expectedMetric string, expectedType pmetric.MetricType) pmetric.NumberDataPointSlice { + require.Equal(t, expectedMetric, m.Name(), "mismatching metric names") + require.NotEmpty(t, m.Description(), "empty description on metric") + require.Equal(t, expectedType, m.Type(), "mismatching metric types") + var dps pmetric.NumberDataPointSlice + switch expectedType { + case pmetric.MetricTypeGauge: + dps = m.Gauge().DataPoints() + case pmetric.MetricTypeSum: + dps = m.Sum().DataPoints() + } + require.Equal(t, 1, dps.Len()) + return dps +} + func AssertMetricsWithLabels(t *testing.T, actualMetric *metricspb.Metric, expectedMetric string, expectedType metricspb.MetricDescriptor_Type, expectedLabels map[string]string, expectedValue int64) { diff --git a/receiver/k8sclusterreceiver/receiver.go b/receiver/k8sclusterreceiver/receiver.go index cb3ad132ff5c..35c2d5005764 100644 --- a/receiver/k8sclusterreceiver/receiver.go +++ b/receiver/k8sclusterreceiver/receiver.go @@ -129,7 +129,7 @@ func newReceiver(_ context.Context, set receiver.CreateSettings, cfg component.C return nil, err } return &kubernetesReceiver{ - resourceWatcher: newResourceWatcher(set.Logger, rCfg), + resourceWatcher: newResourceWatcher(set, rCfg), settings: set, config: rCfg, consumer: consumer, diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index 4569c8556ce0..f60cbc7a8e34 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -23,6 +23,7 @@ import ( quotaclientset "github.com/openshift/client-go/quota/clientset/versioned" quotainformersv1 "github.com/openshift/client-go/quota/informers/externalversions" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/receiver" "go.uber.org/atomic" "go.uber.org/zap" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -64,10 +65,10 @@ type resourceWatcher struct { type metadataConsumer func(metadata []*experimentalmetricmetadata.MetadataUpdate) error // newResourceWatcher creates a Kubernetes resource watcher. -func newResourceWatcher(logger *zap.Logger, cfg *Config) *resourceWatcher { +func newResourceWatcher(set receiver.CreateSettings, cfg *Config) *resourceWatcher { return &resourceWatcher{ - logger: logger, - dataCollector: collection.NewDataCollector(logger, cfg.NodeConditionTypesToReport, cfg.AllocatableTypesToReport), + logger: set.Logger, + dataCollector: collection.NewDataCollector(set, cfg.NodeConditionTypesToReport, cfg.AllocatableTypesToReport), initialSyncDone: atomic.NewBool(false), initialSyncTimedOut: atomic.NewBool(false), initialTimeout: defaultInitialSyncTimeout, diff --git a/receiver/k8sclusterreceiver/watcher_test.go b/receiver/k8sclusterreceiver/watcher_test.go index fe0df886e8cf..03f6c7d236a1 100644 --- a/receiver/k8sclusterreceiver/watcher_test.go +++ b/receiver/k8sclusterreceiver/watcher_test.go @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -199,7 +200,7 @@ func TestPrepareSharedInformerFactory(t *testing.T) { rw := &resourceWatcher{ client: newFakeClientWithAllResources(), logger: obsLogger, - dataCollector: collection.NewDataCollector(zap.NewNop(), []string{}, []string{}), + dataCollector: collection.NewDataCollector(receivertest.NewNopCreateSettings(), []string{}, []string{}), } assert.NoError(t, rw.prepareSharedInformerFactory())