diff --git a/.chloggen/switchk8snode.yaml b/.chloggen/switchk8snode.yaml new file mode 100755 index 000000000000..394965aad460 --- /dev/null +++ b/.chloggen/switchk8snode.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sclusterreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Switch k8s.node metrics to use pdata. + +# One or more tracking issues related to the change +issues: [23438] diff --git a/receiver/k8sclusterreceiver/go.mod b/receiver/k8sclusterreceiver/go.mod index 031d7c30058e..be5ded43ff18 100644 --- a/receiver/k8sclusterreceiver/go.mod +++ b/receiver/k8sclusterreceiver/go.mod @@ -6,7 +6,6 @@ require ( github.com/census-instrumentation/opencensus-proto v0.4.1 github.com/google/go-cmp v0.5.9 github.com/google/uuid v1.3.0 - github.com/iancoleman/strcase v0.2.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.79.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.79.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.79.0 diff --git a/receiver/k8sclusterreceiver/go.sum b/receiver/k8sclusterreceiver/go.sum index 0cb9d46bf876..75fff9744a16 100644 --- a/receiver/k8sclusterreceiver/go.sum +++ b/receiver/k8sclusterreceiver/go.sum @@ -298,8 +298,6 @@ github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKe github.com/hjson/hjson-go/v4 v4.0.0 h1:wlm6IYYqHjOdXH1gHev4VoXCaW20HdQAGCxdOEEg2cs= github.com/hjson/hjson-go/v4 v4.0.0/go.mod h1:KaYt3bTw3zhBjYqnXkYywcYctk0A2nxeEFTse3rH13E= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHLwW0= -github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= diff --git a/receiver/k8sclusterreceiver/internal/collection/collector.go b/receiver/k8sclusterreceiver/internal/collection/collector.go index 4554232bb568..06b916ae1f76 100644 --- a/receiver/k8sclusterreceiver/internal/collection/collector.go +++ b/receiver/k8sclusterreceiver/internal/collection/collector.go @@ -106,7 +106,7 @@ func (dc *DataCollector) SyncMetrics(obj interface{}) { case *corev1.Pod: md = ocsToMetrics(pod.GetMetrics(o, dc.settings.TelemetrySettings.Logger)) case *corev1.Node: - md = ocsToMetrics(node.GetMetrics(o, dc.nodeConditionsToReport, dc.allocatableTypesToReport, dc.settings.TelemetrySettings.Logger)) + md = node.GetMetrics(dc.settings, o, dc.nodeConditionsToReport, dc.allocatableTypesToReport) case *corev1.Namespace: md = namespace.GetMetrics(dc.settings, o) case *corev1.ReplicationController: diff --git a/receiver/k8sclusterreceiver/internal/node/doc.go b/receiver/k8sclusterreceiver/internal/node/doc.go new file mode 100644 index 000000000000..5e313b2c2628 --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/node/doc.go @@ -0,0 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:generate mdatagen metadata.yaml + +package node // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/node" diff --git a/receiver/k8sclusterreceiver/internal/node/documentation.md b/receiver/k8sclusterreceiver/internal/node/documentation.md new file mode 100644 index 000000000000..4e01b5342300 --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/node/documentation.md @@ -0,0 +1,93 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# k8s/node + +## Default Metrics + +The following metrics are emitted by default. Each of them can be disabled by applying the following configuration: + +```yaml +metrics: + : + enabled: false +``` + +### k8s.node.allocatable_cpu + +How many CPU cores remaining that the node can allocate to pods + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| {cores} | Gauge | Double | + +### k8s.node.allocatable_ephemeral_storage + +How many bytes of ephemeral storage remaining that the node can allocate to pods + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| By | Gauge | Int | + +### k8s.node.allocatable_memory + +How many bytes of RAM memory remaining that the node can allocate to pods + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| By | Gauge | Int | + +### k8s.node.allocatable_storage + +How many bytes of storage remaining that the node can allocate to pods + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| By | Gauge | Int | + +### k8s.node.condition_disk_pressure + +Whether this node is DiskPressure (1), not DiskPressure (0) or in an unknown state (-1) + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### k8s.node.condition_memory_pressure + +Whether this node is MemoryPressure (1), not MemoryPressure (0) or in an unknown state (-1) + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### k8s.node.condition_network_unavailable + +Whether this node is NetworkUnavailable (1), not NetworkUnavailable (0) or in an unknown state (-1) + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### k8s.node.condition_pid_pressure + +Whether this node is PidPressure (1), not PidPressure (0) or in an unknown state (-1) + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### k8s.node.condition_ready + +Whether this node is Ready (1), not Ready (0) or in an unknown state (-1) + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +## Resource Attributes + +| Name | Description | Values | Enabled | +| ---- | ----------- | ------ | ------- | +| k8s.node.name | The k8s node name. | Any Str | true | +| k8s.node.uid | The k8s node uid. | Any Str | true | +| opencensus.resourcetype | The OpenCensus resource type. | Any Str | true | diff --git a/receiver/k8sclusterreceiver/internal/node/internal/metadata/generated_config.go b/receiver/k8sclusterreceiver/internal/node/internal/metadata/generated_config.go new file mode 100644 index 000000000000..9ff31fb67b7a --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/node/internal/metadata/generated_config.go @@ -0,0 +1,108 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import "go.opentelemetry.io/collector/confmap" + +// MetricConfig provides common config for a particular metric. +type MetricConfig struct { + Enabled bool `mapstructure:"enabled"` + + enabledSetByUser bool +} + +func (ms *MetricConfig) Unmarshal(parser *confmap.Conf) error { + if parser == nil { + return nil + } + err := parser.Unmarshal(ms, confmap.WithErrorUnused()) + if err != nil { + return err + } + ms.enabledSetByUser = parser.IsSet("enabled") + return nil +} + +// MetricsConfig provides config for k8s/node metrics. +type MetricsConfig struct { + K8sNodeAllocatableCPU MetricConfig `mapstructure:"k8s.node.allocatable_cpu"` + K8sNodeAllocatableEphemeralStorage MetricConfig `mapstructure:"k8s.node.allocatable_ephemeral_storage"` + K8sNodeAllocatableMemory MetricConfig `mapstructure:"k8s.node.allocatable_memory"` + K8sNodeAllocatableStorage MetricConfig `mapstructure:"k8s.node.allocatable_storage"` + K8sNodeConditionDiskPressure MetricConfig `mapstructure:"k8s.node.condition_disk_pressure"` + K8sNodeConditionMemoryPressure MetricConfig `mapstructure:"k8s.node.condition_memory_pressure"` + K8sNodeConditionNetworkUnavailable MetricConfig `mapstructure:"k8s.node.condition_network_unavailable"` + K8sNodeConditionPidPressure MetricConfig `mapstructure:"k8s.node.condition_pid_pressure"` + K8sNodeConditionReady MetricConfig `mapstructure:"k8s.node.condition_ready"` +} + +func DefaultMetricsConfig() MetricsConfig { + return MetricsConfig{ + K8sNodeAllocatableCPU: MetricConfig{ + Enabled: true, + }, + K8sNodeAllocatableEphemeralStorage: MetricConfig{ + Enabled: true, + }, + K8sNodeAllocatableMemory: MetricConfig{ + Enabled: true, + }, + K8sNodeAllocatableStorage: MetricConfig{ + Enabled: true, + }, + K8sNodeConditionDiskPressure: MetricConfig{ + Enabled: true, + }, + K8sNodeConditionMemoryPressure: MetricConfig{ + Enabled: true, + }, + K8sNodeConditionNetworkUnavailable: MetricConfig{ + Enabled: true, + }, + K8sNodeConditionPidPressure: MetricConfig{ + Enabled: true, + }, + K8sNodeConditionReady: MetricConfig{ + Enabled: true, + }, + } +} + +// ResourceAttributeConfig provides common config for a particular resource attribute. +type ResourceAttributeConfig struct { + Enabled bool `mapstructure:"enabled"` +} + +// ResourceAttributesConfig provides config for k8s/node resource attributes. +type ResourceAttributesConfig struct { + K8sNodeName ResourceAttributeConfig `mapstructure:"k8s.node.name"` + K8sNodeUID ResourceAttributeConfig `mapstructure:"k8s.node.uid"` + OpencensusResourcetype ResourceAttributeConfig `mapstructure:"opencensus.resourcetype"` +} + +func DefaultResourceAttributesConfig() ResourceAttributesConfig { + return ResourceAttributesConfig{ + K8sNodeName: ResourceAttributeConfig{ + Enabled: true, + }, + K8sNodeUID: ResourceAttributeConfig{ + Enabled: true, + }, + OpencensusResourcetype: ResourceAttributeConfig{ + Enabled: true, + }, + } +} + +// MetricsBuilderConfig is a configuration for k8s/node metrics builder. +type MetricsBuilderConfig struct { + Metrics MetricsConfig `mapstructure:"metrics"` + ResourceAttributes ResourceAttributesConfig `mapstructure:"resource_attributes"` +} + +func DefaultMetricsBuilderConfig() MetricsBuilderConfig { + return MetricsBuilderConfig{ + Metrics: DefaultMetricsConfig(), + ResourceAttributes: DefaultResourceAttributesConfig(), + } +} diff --git a/receiver/k8sclusterreceiver/internal/node/internal/metadata/generated_config_test.go b/receiver/k8sclusterreceiver/internal/node/internal/metadata/generated_config_test.go new file mode 100644 index 000000000000..d37893d1b0f2 --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/node/internal/metadata/generated_config_test.go @@ -0,0 +1,86 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "path/filepath" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap/confmaptest" +) + +func TestMetricsBuilderConfig(t *testing.T) { + tests := []struct { + name string + want MetricsBuilderConfig + }{ + { + name: "default", + want: DefaultMetricsBuilderConfig(), + }, + { + name: "all_set", + want: MetricsBuilderConfig{ + Metrics: MetricsConfig{ + K8sNodeAllocatableCPU: MetricConfig{Enabled: true}, + K8sNodeAllocatableEphemeralStorage: MetricConfig{Enabled: true}, + K8sNodeAllocatableMemory: MetricConfig{Enabled: true}, + K8sNodeAllocatableStorage: MetricConfig{Enabled: true}, + K8sNodeConditionDiskPressure: MetricConfig{Enabled: true}, + K8sNodeConditionMemoryPressure: MetricConfig{Enabled: true}, + K8sNodeConditionNetworkUnavailable: MetricConfig{Enabled: true}, + K8sNodeConditionPidPressure: MetricConfig{Enabled: true}, + K8sNodeConditionReady: MetricConfig{Enabled: true}, + }, + ResourceAttributes: ResourceAttributesConfig{ + K8sNodeName: ResourceAttributeConfig{Enabled: true}, + K8sNodeUID: ResourceAttributeConfig{Enabled: true}, + OpencensusResourcetype: ResourceAttributeConfig{Enabled: true}, + }, + }, + }, + { + name: "none_set", + want: MetricsBuilderConfig{ + Metrics: MetricsConfig{ + K8sNodeAllocatableCPU: MetricConfig{Enabled: false}, + K8sNodeAllocatableEphemeralStorage: MetricConfig{Enabled: false}, + K8sNodeAllocatableMemory: MetricConfig{Enabled: false}, + K8sNodeAllocatableStorage: MetricConfig{Enabled: false}, + K8sNodeConditionDiskPressure: MetricConfig{Enabled: false}, + K8sNodeConditionMemoryPressure: MetricConfig{Enabled: false}, + K8sNodeConditionNetworkUnavailable: MetricConfig{Enabled: false}, + K8sNodeConditionPidPressure: MetricConfig{Enabled: false}, + K8sNodeConditionReady: MetricConfig{Enabled: false}, + }, + ResourceAttributes: ResourceAttributesConfig{ + K8sNodeName: ResourceAttributeConfig{Enabled: false}, + K8sNodeUID: ResourceAttributeConfig{Enabled: false}, + OpencensusResourcetype: ResourceAttributeConfig{Enabled: false}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := loadMetricsBuilderConfig(t, tt.name) + if diff := cmp.Diff(tt.want, cfg, cmpopts.IgnoreUnexported(MetricConfig{}, ResourceAttributeConfig{})); diff != "" { + t.Errorf("Config mismatch (-expected +actual):\n%s", diff) + } + }) + } +} + +func loadMetricsBuilderConfig(t *testing.T, name string) MetricsBuilderConfig { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + sub, err := cm.Sub(name) + require.NoError(t, err) + cfg := DefaultMetricsBuilderConfig() + require.NoError(t, component.UnmarshalConfig(sub, &cfg)) + return cfg +} diff --git a/receiver/k8sclusterreceiver/internal/node/internal/metadata/generated_metrics.go b/receiver/k8sclusterreceiver/internal/node/internal/metadata/generated_metrics.go new file mode 100644 index 000000000000..83bd70999fee --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/node/internal/metadata/generated_metrics.go @@ -0,0 +1,662 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" + conventions "go.opentelemetry.io/collector/semconv/v1.18.0" +) + +type metricK8sNodeAllocatableCPU struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.node.allocatable_cpu metric with initial data. +func (m *metricK8sNodeAllocatableCPU) init() { + m.data.SetName("k8s.node.allocatable_cpu") + m.data.SetDescription("How many CPU cores remaining that the node can allocate to pods") + m.data.SetUnit("{cores}") + m.data.SetEmptyGauge() +} + +func (m *metricK8sNodeAllocatableCPU) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetDoubleValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sNodeAllocatableCPU) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sNodeAllocatableCPU) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sNodeAllocatableCPU(cfg MetricConfig) metricK8sNodeAllocatableCPU { + m := metricK8sNodeAllocatableCPU{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sNodeAllocatableEphemeralStorage struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.node.allocatable_ephemeral_storage metric with initial data. +func (m *metricK8sNodeAllocatableEphemeralStorage) init() { + m.data.SetName("k8s.node.allocatable_ephemeral_storage") + m.data.SetDescription("How many bytes of ephemeral storage remaining that the node can allocate to pods") + m.data.SetUnit("By") + m.data.SetEmptyGauge() +} + +func (m *metricK8sNodeAllocatableEphemeralStorage) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sNodeAllocatableEphemeralStorage) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sNodeAllocatableEphemeralStorage) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sNodeAllocatableEphemeralStorage(cfg MetricConfig) metricK8sNodeAllocatableEphemeralStorage { + m := metricK8sNodeAllocatableEphemeralStorage{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sNodeAllocatableMemory struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.node.allocatable_memory metric with initial data. +func (m *metricK8sNodeAllocatableMemory) init() { + m.data.SetName("k8s.node.allocatable_memory") + m.data.SetDescription("How many bytes of RAM memory remaining that the node can allocate to pods") + m.data.SetUnit("By") + m.data.SetEmptyGauge() +} + +func (m *metricK8sNodeAllocatableMemory) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sNodeAllocatableMemory) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sNodeAllocatableMemory) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sNodeAllocatableMemory(cfg MetricConfig) metricK8sNodeAllocatableMemory { + m := metricK8sNodeAllocatableMemory{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sNodeAllocatableStorage struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.node.allocatable_storage metric with initial data. +func (m *metricK8sNodeAllocatableStorage) init() { + m.data.SetName("k8s.node.allocatable_storage") + m.data.SetDescription("How many bytes of storage remaining that the node can allocate to pods") + m.data.SetUnit("By") + m.data.SetEmptyGauge() +} + +func (m *metricK8sNodeAllocatableStorage) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sNodeAllocatableStorage) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sNodeAllocatableStorage) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sNodeAllocatableStorage(cfg MetricConfig) metricK8sNodeAllocatableStorage { + m := metricK8sNodeAllocatableStorage{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sNodeConditionDiskPressure struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.node.condition_disk_pressure metric with initial data. +func (m *metricK8sNodeConditionDiskPressure) init() { + m.data.SetName("k8s.node.condition_disk_pressure") + m.data.SetDescription("Whether this node is DiskPressure (1), not DiskPressure (0) or in an unknown state (-1)") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricK8sNodeConditionDiskPressure) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sNodeConditionDiskPressure) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sNodeConditionDiskPressure) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sNodeConditionDiskPressure(cfg MetricConfig) metricK8sNodeConditionDiskPressure { + m := metricK8sNodeConditionDiskPressure{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sNodeConditionMemoryPressure struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.node.condition_memory_pressure metric with initial data. +func (m *metricK8sNodeConditionMemoryPressure) init() { + m.data.SetName("k8s.node.condition_memory_pressure") + m.data.SetDescription("Whether this node is MemoryPressure (1), not MemoryPressure (0) or in an unknown state (-1)") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricK8sNodeConditionMemoryPressure) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sNodeConditionMemoryPressure) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sNodeConditionMemoryPressure) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sNodeConditionMemoryPressure(cfg MetricConfig) metricK8sNodeConditionMemoryPressure { + m := metricK8sNodeConditionMemoryPressure{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sNodeConditionNetworkUnavailable struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.node.condition_network_unavailable metric with initial data. +func (m *metricK8sNodeConditionNetworkUnavailable) init() { + m.data.SetName("k8s.node.condition_network_unavailable") + m.data.SetDescription("Whether this node is NetworkUnavailable (1), not NetworkUnavailable (0) or in an unknown state (-1)") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricK8sNodeConditionNetworkUnavailable) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sNodeConditionNetworkUnavailable) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sNodeConditionNetworkUnavailable) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sNodeConditionNetworkUnavailable(cfg MetricConfig) metricK8sNodeConditionNetworkUnavailable { + m := metricK8sNodeConditionNetworkUnavailable{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sNodeConditionPidPressure struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.node.condition_pid_pressure metric with initial data. +func (m *metricK8sNodeConditionPidPressure) init() { + m.data.SetName("k8s.node.condition_pid_pressure") + m.data.SetDescription("Whether this node is PidPressure (1), not PidPressure (0) or in an unknown state (-1)") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricK8sNodeConditionPidPressure) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sNodeConditionPidPressure) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sNodeConditionPidPressure) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sNodeConditionPidPressure(cfg MetricConfig) metricK8sNodeConditionPidPressure { + m := metricK8sNodeConditionPidPressure{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sNodeConditionReady struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.node.condition_ready metric with initial data. +func (m *metricK8sNodeConditionReady) init() { + m.data.SetName("k8s.node.condition_ready") + m.data.SetDescription("Whether this node is Ready (1), not Ready (0) or in an unknown state (-1)") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricK8sNodeConditionReady) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sNodeConditionReady) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sNodeConditionReady) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sNodeConditionReady(cfg MetricConfig) metricK8sNodeConditionReady { + m := metricK8sNodeConditionReady{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +// MetricsBuilder provides an interface for scrapers to report metrics while taking care of all the transformations +// required to produce metric representation defined in metadata and user config. +type MetricsBuilder struct { + startTime pcommon.Timestamp // start time that will be applied to all recorded data points. + metricsCapacity int // maximum observed number of metrics per resource. + resourceCapacity int // maximum observed number of resource attributes. + metricsBuffer pmetric.Metrics // accumulates metrics data before emitting. + buildInfo component.BuildInfo // contains version information + resourceAttributesConfig ResourceAttributesConfig + metricK8sNodeAllocatableCPU metricK8sNodeAllocatableCPU + metricK8sNodeAllocatableEphemeralStorage metricK8sNodeAllocatableEphemeralStorage + metricK8sNodeAllocatableMemory metricK8sNodeAllocatableMemory + metricK8sNodeAllocatableStorage metricK8sNodeAllocatableStorage + metricK8sNodeConditionDiskPressure metricK8sNodeConditionDiskPressure + metricK8sNodeConditionMemoryPressure metricK8sNodeConditionMemoryPressure + metricK8sNodeConditionNetworkUnavailable metricK8sNodeConditionNetworkUnavailable + metricK8sNodeConditionPidPressure metricK8sNodeConditionPidPressure + metricK8sNodeConditionReady metricK8sNodeConditionReady +} + +// metricBuilderOption applies changes to default metrics builder. +type metricBuilderOption func(*MetricsBuilder) + +// WithStartTime sets startTime on the metrics builder. +func WithStartTime(startTime pcommon.Timestamp) metricBuilderOption { + return func(mb *MetricsBuilder) { + mb.startTime = startTime + } +} + +func NewMetricsBuilder(mbc MetricsBuilderConfig, settings receiver.CreateSettings, options ...metricBuilderOption) *MetricsBuilder { + mb := &MetricsBuilder{ + startTime: pcommon.NewTimestampFromTime(time.Now()), + metricsBuffer: pmetric.NewMetrics(), + buildInfo: settings.BuildInfo, + resourceAttributesConfig: mbc.ResourceAttributes, + metricK8sNodeAllocatableCPU: newMetricK8sNodeAllocatableCPU(mbc.Metrics.K8sNodeAllocatableCPU), + metricK8sNodeAllocatableEphemeralStorage: newMetricK8sNodeAllocatableEphemeralStorage(mbc.Metrics.K8sNodeAllocatableEphemeralStorage), + metricK8sNodeAllocatableMemory: newMetricK8sNodeAllocatableMemory(mbc.Metrics.K8sNodeAllocatableMemory), + metricK8sNodeAllocatableStorage: newMetricK8sNodeAllocatableStorage(mbc.Metrics.K8sNodeAllocatableStorage), + metricK8sNodeConditionDiskPressure: newMetricK8sNodeConditionDiskPressure(mbc.Metrics.K8sNodeConditionDiskPressure), + metricK8sNodeConditionMemoryPressure: newMetricK8sNodeConditionMemoryPressure(mbc.Metrics.K8sNodeConditionMemoryPressure), + metricK8sNodeConditionNetworkUnavailable: newMetricK8sNodeConditionNetworkUnavailable(mbc.Metrics.K8sNodeConditionNetworkUnavailable), + metricK8sNodeConditionPidPressure: newMetricK8sNodeConditionPidPressure(mbc.Metrics.K8sNodeConditionPidPressure), + metricK8sNodeConditionReady: newMetricK8sNodeConditionReady(mbc.Metrics.K8sNodeConditionReady), + } + for _, op := range options { + op(mb) + } + return mb +} + +// updateCapacity updates max length of metrics and resource attributes that will be used for the slice capacity. +func (mb *MetricsBuilder) updateCapacity(rm pmetric.ResourceMetrics) { + if mb.metricsCapacity < rm.ScopeMetrics().At(0).Metrics().Len() { + mb.metricsCapacity = rm.ScopeMetrics().At(0).Metrics().Len() + } + if mb.resourceCapacity < rm.Resource().Attributes().Len() { + mb.resourceCapacity = rm.Resource().Attributes().Len() + } +} + +// ResourceMetricsOption applies changes to provided resource metrics. +type ResourceMetricsOption func(ResourceAttributesConfig, pmetric.ResourceMetrics) + +// WithK8sNodeName sets provided value as "k8s.node.name" attribute for current resource. +func WithK8sNodeName(val string) ResourceMetricsOption { + return func(rac ResourceAttributesConfig, rm pmetric.ResourceMetrics) { + if rac.K8sNodeName.Enabled { + rm.Resource().Attributes().PutStr("k8s.node.name", val) + } + } +} + +// WithK8sNodeUID sets provided value as "k8s.node.uid" attribute for current resource. +func WithK8sNodeUID(val string) ResourceMetricsOption { + return func(rac ResourceAttributesConfig, rm pmetric.ResourceMetrics) { + if rac.K8sNodeUID.Enabled { + rm.Resource().Attributes().PutStr("k8s.node.uid", val) + } + } +} + +// WithOpencensusResourcetype sets provided value as "opencensus.resourcetype" attribute for current resource. +func WithOpencensusResourcetype(val string) ResourceMetricsOption { + return func(rac ResourceAttributesConfig, rm pmetric.ResourceMetrics) { + if rac.OpencensusResourcetype.Enabled { + rm.Resource().Attributes().PutStr("opencensus.resourcetype", val) + } + } +} + +// WithStartTimeOverride overrides start time for all the resource metrics data points. +// This option should be only used if different start time has to be set on metrics coming from different resources. +func WithStartTimeOverride(start pcommon.Timestamp) ResourceMetricsOption { + return func(_ ResourceAttributesConfig, rm pmetric.ResourceMetrics) { + var dps pmetric.NumberDataPointSlice + metrics := rm.ScopeMetrics().At(0).Metrics() + for i := 0; i < metrics.Len(); i++ { + switch metrics.At(i).Type() { + case pmetric.MetricTypeGauge: + dps = metrics.At(i).Gauge().DataPoints() + case pmetric.MetricTypeSum: + dps = metrics.At(i).Sum().DataPoints() + } + for j := 0; j < dps.Len(); j++ { + dps.At(j).SetStartTimestamp(start) + } + } + } +} + +// EmitForResource saves all the generated metrics under a new resource and updates the internal state to be ready for +// recording another set of data points as part of another resource. This function can be helpful when one scraper +// needs to emit metrics from several resources. Otherwise calling this function is not required, +// just `Emit` function can be called instead. +// Resource attributes should be provided as ResourceMetricsOption arguments. +func (mb *MetricsBuilder) EmitForResource(rmo ...ResourceMetricsOption) { + rm := pmetric.NewResourceMetrics() + rm.SetSchemaUrl(conventions.SchemaURL) + rm.Resource().Attributes().EnsureCapacity(mb.resourceCapacity) + ils := rm.ScopeMetrics().AppendEmpty() + ils.Scope().SetName("otelcol/k8sclusterreceiver") + ils.Scope().SetVersion(mb.buildInfo.Version) + ils.Metrics().EnsureCapacity(mb.metricsCapacity) + mb.metricK8sNodeAllocatableCPU.emit(ils.Metrics()) + mb.metricK8sNodeAllocatableEphemeralStorage.emit(ils.Metrics()) + mb.metricK8sNodeAllocatableMemory.emit(ils.Metrics()) + mb.metricK8sNodeAllocatableStorage.emit(ils.Metrics()) + mb.metricK8sNodeConditionDiskPressure.emit(ils.Metrics()) + mb.metricK8sNodeConditionMemoryPressure.emit(ils.Metrics()) + mb.metricK8sNodeConditionNetworkUnavailable.emit(ils.Metrics()) + mb.metricK8sNodeConditionPidPressure.emit(ils.Metrics()) + mb.metricK8sNodeConditionReady.emit(ils.Metrics()) + + for _, op := range rmo { + op(mb.resourceAttributesConfig, rm) + } + if ils.Metrics().Len() > 0 { + mb.updateCapacity(rm) + rm.MoveTo(mb.metricsBuffer.ResourceMetrics().AppendEmpty()) + } +} + +// Emit returns all the metrics accumulated by the metrics builder and updates the internal state to be ready for +// recording another set of metrics. This function will be responsible for applying all the transformations required to +// produce metric representation defined in metadata and user config, e.g. delta or cumulative. +func (mb *MetricsBuilder) Emit(rmo ...ResourceMetricsOption) pmetric.Metrics { + mb.EmitForResource(rmo...) + metrics := mb.metricsBuffer + mb.metricsBuffer = pmetric.NewMetrics() + return metrics +} + +// RecordK8sNodeAllocatableCPUDataPoint adds a data point to k8s.node.allocatable_cpu metric. +func (mb *MetricsBuilder) RecordK8sNodeAllocatableCPUDataPoint(ts pcommon.Timestamp, val float64) { + mb.metricK8sNodeAllocatableCPU.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sNodeAllocatableEphemeralStorageDataPoint adds a data point to k8s.node.allocatable_ephemeral_storage metric. +func (mb *MetricsBuilder) RecordK8sNodeAllocatableEphemeralStorageDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sNodeAllocatableEphemeralStorage.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sNodeAllocatableMemoryDataPoint adds a data point to k8s.node.allocatable_memory metric. +func (mb *MetricsBuilder) RecordK8sNodeAllocatableMemoryDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sNodeAllocatableMemory.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sNodeAllocatableStorageDataPoint adds a data point to k8s.node.allocatable_storage metric. +func (mb *MetricsBuilder) RecordK8sNodeAllocatableStorageDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sNodeAllocatableStorage.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sNodeConditionDiskPressureDataPoint adds a data point to k8s.node.condition_disk_pressure metric. +func (mb *MetricsBuilder) RecordK8sNodeConditionDiskPressureDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sNodeConditionDiskPressure.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sNodeConditionMemoryPressureDataPoint adds a data point to k8s.node.condition_memory_pressure metric. +func (mb *MetricsBuilder) RecordK8sNodeConditionMemoryPressureDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sNodeConditionMemoryPressure.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sNodeConditionNetworkUnavailableDataPoint adds a data point to k8s.node.condition_network_unavailable metric. +func (mb *MetricsBuilder) RecordK8sNodeConditionNetworkUnavailableDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sNodeConditionNetworkUnavailable.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sNodeConditionPidPressureDataPoint adds a data point to k8s.node.condition_pid_pressure metric. +func (mb *MetricsBuilder) RecordK8sNodeConditionPidPressureDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sNodeConditionPidPressure.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sNodeConditionReadyDataPoint adds a data point to k8s.node.condition_ready metric. +func (mb *MetricsBuilder) RecordK8sNodeConditionReadyDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sNodeConditionReady.recordDataPoint(mb.startTime, ts, val) +} + +// Reset resets metrics builder to its initial state. It should be used when external metrics source is restarted, +// and metrics builder should update its startTime and reset it's internal state accordingly. +func (mb *MetricsBuilder) Reset(options ...metricBuilderOption) { + mb.startTime = pcommon.NewTimestampFromTime(time.Now()) + for _, op := range options { + op(mb) + } +} diff --git a/receiver/k8sclusterreceiver/internal/node/internal/metadata/generated_metrics_test.go b/receiver/k8sclusterreceiver/internal/node/internal/metadata/generated_metrics_test.go new file mode 100644 index 000000000000..4ac2ed6c1f58 --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/node/internal/metadata/generated_metrics_test.go @@ -0,0 +1,251 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +type testConfigCollection int + +const ( + testSetDefault testConfigCollection = iota + testSetAll + testSetNone +) + +func TestMetricsBuilder(t *testing.T) { + tests := []struct { + name string + configSet testConfigCollection + }{ + { + name: "default", + configSet: testSetDefault, + }, + { + name: "all_set", + configSet: testSetAll, + }, + { + name: "none_set", + configSet: testSetNone, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + start := pcommon.Timestamp(1_000_000_000) + ts := pcommon.Timestamp(1_000_001_000) + observedZapCore, observedLogs := observer.New(zap.WarnLevel) + settings := receivertest.NewNopCreateSettings() + settings.Logger = zap.New(observedZapCore) + mb := NewMetricsBuilder(loadMetricsBuilderConfig(t, test.name), settings, WithStartTime(start)) + + expectedWarnings := 0 + assert.Equal(t, expectedWarnings, observedLogs.Len()) + + defaultMetricsCount := 0 + allMetricsCount := 0 + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sNodeAllocatableCPUDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sNodeAllocatableEphemeralStorageDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sNodeAllocatableMemoryDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sNodeAllocatableStorageDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sNodeConditionDiskPressureDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sNodeConditionMemoryPressureDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sNodeConditionNetworkUnavailableDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sNodeConditionPidPressureDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sNodeConditionReadyDataPoint(ts, 1) + + metrics := mb.Emit(WithK8sNodeName("attr-val"), WithK8sNodeUID("attr-val"), WithOpencensusResourcetype("attr-val")) + + if test.configSet == testSetNone { + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) + return + } + + assert.Equal(t, 1, metrics.ResourceMetrics().Len()) + rm := metrics.ResourceMetrics().At(0) + attrCount := 0 + enabledAttrCount := 0 + attrVal, ok := rm.Resource().Attributes().Get("k8s.node.name") + attrCount++ + assert.Equal(t, mb.resourceAttributesConfig.K8sNodeName.Enabled, ok) + if mb.resourceAttributesConfig.K8sNodeName.Enabled { + enabledAttrCount++ + assert.EqualValues(t, "attr-val", attrVal.Str()) + } + attrVal, ok = rm.Resource().Attributes().Get("k8s.node.uid") + attrCount++ + assert.Equal(t, mb.resourceAttributesConfig.K8sNodeUID.Enabled, ok) + if mb.resourceAttributesConfig.K8sNodeUID.Enabled { + enabledAttrCount++ + assert.EqualValues(t, "attr-val", attrVal.Str()) + } + attrVal, ok = rm.Resource().Attributes().Get("opencensus.resourcetype") + attrCount++ + assert.Equal(t, mb.resourceAttributesConfig.OpencensusResourcetype.Enabled, ok) + if mb.resourceAttributesConfig.OpencensusResourcetype.Enabled { + enabledAttrCount++ + assert.EqualValues(t, "attr-val", attrVal.Str()) + } + assert.Equal(t, enabledAttrCount, rm.Resource().Attributes().Len()) + assert.Equal(t, attrCount, 3) + + assert.Equal(t, 1, rm.ScopeMetrics().Len()) + ms := rm.ScopeMetrics().At(0).Metrics() + if test.configSet == testSetDefault { + assert.Equal(t, defaultMetricsCount, ms.Len()) + } + if test.configSet == testSetAll { + assert.Equal(t, allMetricsCount, ms.Len()) + } + validatedMetrics := make(map[string]bool) + for i := 0; i < ms.Len(); i++ { + switch ms.At(i).Name() { + case "k8s.node.allocatable_cpu": + assert.False(t, validatedMetrics["k8s.node.allocatable_cpu"], "Found a duplicate in the metrics slice: k8s.node.allocatable_cpu") + validatedMetrics["k8s.node.allocatable_cpu"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "How many CPU cores remaining that the node can allocate to pods", ms.At(i).Description()) + assert.Equal(t, "{cores}", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + assert.Equal(t, float64(1), dp.DoubleValue()) + case "k8s.node.allocatable_ephemeral_storage": + assert.False(t, validatedMetrics["k8s.node.allocatable_ephemeral_storage"], "Found a duplicate in the metrics slice: k8s.node.allocatable_ephemeral_storage") + validatedMetrics["k8s.node.allocatable_ephemeral_storage"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "How many bytes of ephemeral storage remaining that the node can allocate to pods", ms.At(i).Description()) + assert.Equal(t, "By", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.node.allocatable_memory": + assert.False(t, validatedMetrics["k8s.node.allocatable_memory"], "Found a duplicate in the metrics slice: k8s.node.allocatable_memory") + validatedMetrics["k8s.node.allocatable_memory"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "How many bytes of RAM memory remaining that the node can allocate to pods", ms.At(i).Description()) + assert.Equal(t, "By", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.node.allocatable_storage": + assert.False(t, validatedMetrics["k8s.node.allocatable_storage"], "Found a duplicate in the metrics slice: k8s.node.allocatable_storage") + validatedMetrics["k8s.node.allocatable_storage"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "How many bytes of storage remaining that the node can allocate to pods", ms.At(i).Description()) + assert.Equal(t, "By", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.node.condition_disk_pressure": + assert.False(t, validatedMetrics["k8s.node.condition_disk_pressure"], "Found a duplicate in the metrics slice: k8s.node.condition_disk_pressure") + validatedMetrics["k8s.node.condition_disk_pressure"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "Whether this node is DiskPressure (1), not DiskPressure (0) or in an unknown state (-1)", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.node.condition_memory_pressure": + assert.False(t, validatedMetrics["k8s.node.condition_memory_pressure"], "Found a duplicate in the metrics slice: k8s.node.condition_memory_pressure") + validatedMetrics["k8s.node.condition_memory_pressure"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "Whether this node is MemoryPressure (1), not MemoryPressure (0) or in an unknown state (-1)", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.node.condition_network_unavailable": + assert.False(t, validatedMetrics["k8s.node.condition_network_unavailable"], "Found a duplicate in the metrics slice: k8s.node.condition_network_unavailable") + validatedMetrics["k8s.node.condition_network_unavailable"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "Whether this node is NetworkUnavailable (1), not NetworkUnavailable (0) or in an unknown state (-1)", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.node.condition_pid_pressure": + assert.False(t, validatedMetrics["k8s.node.condition_pid_pressure"], "Found a duplicate in the metrics slice: k8s.node.condition_pid_pressure") + validatedMetrics["k8s.node.condition_pid_pressure"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "Whether this node is PidPressure (1), not PidPressure (0) or in an unknown state (-1)", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.node.condition_ready": + assert.False(t, validatedMetrics["k8s.node.condition_ready"], "Found a duplicate in the metrics slice: k8s.node.condition_ready") + validatedMetrics["k8s.node.condition_ready"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "Whether this node is Ready (1), not Ready (0) or in an unknown state (-1)", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + } + } + }) + } +} diff --git a/receiver/k8sclusterreceiver/internal/node/internal/metadata/testdata/config.yaml b/receiver/k8sclusterreceiver/internal/node/internal/metadata/testdata/config.yaml new file mode 100644 index 000000000000..181c13801344 --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/node/internal/metadata/testdata/config.yaml @@ -0,0 +1,55 @@ +default: +all_set: + metrics: + k8s.node.allocatable_cpu: + enabled: true + k8s.node.allocatable_ephemeral_storage: + enabled: true + k8s.node.allocatable_memory: + enabled: true + k8s.node.allocatable_storage: + enabled: true + k8s.node.condition_disk_pressure: + enabled: true + k8s.node.condition_memory_pressure: + enabled: true + k8s.node.condition_network_unavailable: + enabled: true + k8s.node.condition_pid_pressure: + enabled: true + k8s.node.condition_ready: + enabled: true + resource_attributes: + k8s.node.name: + enabled: true + k8s.node.uid: + enabled: true + opencensus.resourcetype: + enabled: true +none_set: + metrics: + k8s.node.allocatable_cpu: + enabled: false + k8s.node.allocatable_ephemeral_storage: + enabled: false + k8s.node.allocatable_memory: + enabled: false + k8s.node.allocatable_storage: + enabled: false + k8s.node.condition_disk_pressure: + enabled: false + k8s.node.condition_memory_pressure: + enabled: false + k8s.node.condition_network_unavailable: + enabled: false + k8s.node.condition_pid_pressure: + enabled: false + k8s.node.condition_ready: + enabled: false + resource_attributes: + k8s.node.name: + enabled: false + k8s.node.uid: + enabled: false + opencensus.resourcetype: + enabled: false diff --git a/receiver/k8sclusterreceiver/internal/node/metadata.yaml b/receiver/k8sclusterreceiver/internal/node/metadata.yaml new file mode 100644 index 000000000000..d0d745587914 --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/node/metadata.yaml @@ -0,0 +1,75 @@ +type: k8s/node + +sem_conv_version: 1.18.0 + +resource_attributes: + k8s.node.uid: + description: The k8s node uid. + type: string + enabled: true + + k8s.node.name: + description: The k8s node name. + type: string + enabled: true + + opencensus.resourcetype: + description: The OpenCensus resource type. + type: string + enabled: true + +metrics: + k8s.node.condition_ready: + enabled: true + description: Whether this node is Ready (1), not Ready (0) or in an unknown state (-1) + unit: 1 + gauge: + value_type: int + k8s.node.condition_memory_pressure: + enabled: true + description: Whether this node is MemoryPressure (1), not MemoryPressure (0) or in an unknown state (-1) + unit: 1 + gauge: + value_type: int + k8s.node.condition_disk_pressure: + enabled: true + description: Whether this node is DiskPressure (1), not DiskPressure (0) or in an unknown state (-1) + unit: 1 + gauge: + value_type: int + k8s.node.condition_pid_pressure: + enabled: true + description: Whether this node is PidPressure (1), not PidPressure (0) or in an unknown state (-1) + unit: 1 + gauge: + value_type: int + k8s.node.condition_network_unavailable: + enabled: true + description: Whether this node is NetworkUnavailable (1), not NetworkUnavailable (0) or in an unknown state (-1) + unit: 1 + gauge: + value_type: int + k8s.node.allocatable_cpu: + enabled: true + description: How many CPU cores remaining that the node can allocate to pods + unit: "{cores}" + gauge: + value_type: double + k8s.node.allocatable_memory: + enabled: true + description: How many bytes of RAM memory remaining that the node can allocate to pods + unit: "By" + gauge: + value_type: int + k8s.node.allocatable_ephemeral_storage: + enabled: true + description: How many bytes of ephemeral storage remaining that the node can allocate to pods + unit: "By" + gauge: + value_type: int + k8s.node.allocatable_storage: + enabled: true + description: How many bytes of storage remaining that the node can allocate to pods + unit: "By" + gauge: + value_type: int \ No newline at end of file diff --git a/receiver/k8sclusterreceiver/internal/node/nodes.go b/receiver/k8sclusterreceiver/internal/node/nodes.go index 62dee34d8597..a8baba80b73f 100644 --- a/receiver/k8sclusterreceiver/internal/node/nodes.go +++ b/receiver/k8sclusterreceiver/internal/node/nodes.go @@ -7,19 +7,17 @@ import ( "fmt" "time" - agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" - resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" - "github.com/iancoleman/strcase" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/maps" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" + imetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/node/internal/metadata" ) const ( @@ -27,13 +25,6 @@ const ( nodeCreationTime = "node.creation_timestamp" ) -var allocatableDesciption = map[string]string{ - "cpu": "How many CPU cores remaining that the node can allocate to pods", - "memory": "How many bytes of RAM memory remaining that the node can allocate to pods", - "ephemeral-storage": "How many bytes of ephemeral storage remaining that the node can allocate to pods", - "storage": "How many bytes of storage remaining that the node can allocate to pods", -} - // Transform transforms the node to remove the fields that we don't use to reduce RAM utilization. // IMPORTANT: Make sure to update this function before using new node fields. func Transform(node *corev1.Node) *corev1.Node { @@ -52,79 +43,52 @@ func Transform(node *corev1.Node) *corev1.Node { return newNode } -func GetMetrics(node *corev1.Node, nodeConditionTypesToReport, allocatableTypesToReport []string, logger *zap.Logger) []*agentmetricspb.ExportMetricsServiceRequest { - metrics := make([]*metricspb.Metric, 0, len(nodeConditionTypesToReport)+len(allocatableTypesToReport)) +func GetMetrics(set receiver.CreateSettings, node *corev1.Node, nodeConditionTypesToReport, allocatableTypesToReport []string) pmetric.Metrics { + mb := imetadata.NewMetricsBuilder(imetadata.DefaultMetricsBuilderConfig(), set) + ts := pcommon.NewTimestampFromTime(time.Now()) + // Adding 'node condition type' metrics for _, nodeConditionTypeValue := range nodeConditionTypesToReport { - nodeConditionMetric := getNodeConditionMetric(nodeConditionTypeValue) v1NodeConditionTypeValue := corev1.NodeConditionType(nodeConditionTypeValue) - - metrics = append(metrics, &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: nodeConditionMetric, - Description: fmt.Sprintf("Whether this node is %s (1), "+ - "not %s (0) or in an unknown state (-1)", nodeConditionTypeValue, nodeConditionTypeValue), - Type: metricspb.MetricDescriptor_GAUGE_INT64, - }, - Timeseries: []*metricspb.TimeSeries{ - utils.GetInt64TimeSeries(nodeConditionValue(node, v1NodeConditionTypeValue)), - }, - }) + switch v1NodeConditionTypeValue { + case corev1.NodeReady: + mb.RecordK8sNodeConditionReadyDataPoint(ts, nodeConditionValue(node, v1NodeConditionTypeValue)) + case corev1.NodeMemoryPressure: + mb.RecordK8sNodeConditionMemoryPressureDataPoint(ts, nodeConditionValue(node, v1NodeConditionTypeValue)) + case corev1.NodeDiskPressure: + mb.RecordK8sNodeConditionDiskPressureDataPoint(ts, nodeConditionValue(node, v1NodeConditionTypeValue)) + case corev1.NodeNetworkUnavailable: + mb.RecordK8sNodeConditionNetworkUnavailableDataPoint(ts, nodeConditionValue(node, v1NodeConditionTypeValue)) + case corev1.NodePIDPressure: + mb.RecordK8sNodeConditionPidPressureDataPoint(ts, nodeConditionValue(node, v1NodeConditionTypeValue)) + default: + set.Logger.Warn("unknown node condition type", zap.String("conditionType", nodeConditionTypeValue)) + } } // Adding 'node allocatable type' metrics for _, nodeAllocatableTypeValue := range allocatableTypesToReport { - nodeAllocatableMetric := getNodeAllocatableMetric(nodeAllocatableTypeValue) v1NodeAllocatableTypeValue := corev1.ResourceName(nodeAllocatableTypeValue) - valType := metricspb.MetricDescriptor_GAUGE_INT64 quantity, ok := node.Status.Allocatable[v1NodeAllocatableTypeValue] if !ok { - logger.Debug(fmt.Errorf("allocatable type %v not found in node %v", nodeAllocatableTypeValue, + set.Logger.Debug(fmt.Errorf("allocatable type %v not found in node %v", nodeAllocatableTypeValue, node.GetName()).Error()) continue } - val := utils.GetInt64TimeSeries(quantity.Value()) - if v1NodeAllocatableTypeValue == corev1.ResourceCPU { + switch v1NodeAllocatableTypeValue { + case corev1.ResourceCPU: // cpu metrics must be of the double type to adhere to opentelemetry system.cpu metric specifications - val = utils.GetDoubleTimeSeries(float64(quantity.MilliValue()) / 1000.0) - valType = metricspb.MetricDescriptor_GAUGE_DOUBLE + mb.RecordK8sNodeAllocatableCPUDataPoint(ts, float64(quantity.MilliValue())/1000.0) + case corev1.ResourceMemory: + mb.RecordK8sNodeAllocatableMemoryDataPoint(ts, quantity.Value()) + case corev1.ResourceEphemeralStorage: + mb.RecordK8sNodeAllocatableEphemeralStorageDataPoint(ts, quantity.Value()) + case corev1.ResourceStorage: + mb.RecordK8sNodeAllocatableStorageDataPoint(ts, quantity.Value()) } - metrics = append(metrics, &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: nodeAllocatableMetric, - Description: allocatableDesciption[v1NodeAllocatableTypeValue.String()], - Type: valType, - }, - Timeseries: []*metricspb.TimeSeries{ - val, - }, - }) } + return mb.Emit(imetadata.WithK8sNodeUID(string(node.UID)), imetadata.WithK8sNodeName(node.Name), imetadata.WithOpencensusResourcetype("k8s")) - return []*agentmetricspb.ExportMetricsServiceRequest{ - { - Resource: getResourceForNode(node), - Metrics: metrics, - }, - } -} - -func getNodeConditionMetric(nodeConditionTypeValue string) string { - return fmt.Sprintf("k8s.node.condition_%s", strcase.ToSnake(nodeConditionTypeValue)) -} - -func getNodeAllocatableMetric(nodeAllocatableTypeValue string) string { - return fmt.Sprintf("k8s.node.allocatable_%s", strcase.ToSnake(nodeAllocatableTypeValue)) -} - -func getResourceForNode(node *corev1.Node) *resourcepb.Resource { - return &resourcepb.Resource{ - Type: constants.K8sType, - Labels: map[string]string{ - conventions.AttributeK8SNodeUID: string(node.UID), - conventions.AttributeK8SNodeName: node.Name, - }, - } } var nodeConditionValues = map[corev1.ConditionStatus]int64{ diff --git a/receiver/k8sclusterreceiver/internal/node/nodes_test.go b/receiver/k8sclusterreceiver/internal/node/nodes_test.go index cb75c7f0b340..52432ef0a432 100644 --- a/receiver/k8sclusterreceiver/internal/node/nodes_test.go +++ b/receiver/k8sclusterreceiver/internal/node/nodes_test.go @@ -4,78 +4,34 @@ package node import ( + "path/filepath" "testing" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" + "go.opentelemetry.io/collector/receiver/receivertest" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) func TestNodeMetricsReportCPUMetrics(t *testing.T) { n := testutils.NewNode("1") - - actualResourceMetrics := GetMetrics(n, []string{"Ready", "MemoryPressure"}, []string{"cpu", "memory", "ephemeral-storage", "storage"}, zap.NewNop()) - - require.Equal(t, 1, len(actualResourceMetrics)) - - require.Equal(t, 5, len(actualResourceMetrics[0].Metrics)) - testutils.AssertResource(t, actualResourceMetrics[0].Resource, constants.K8sType, - map[string]string{ - "k8s.node.uid": "test-node-1-uid", - "k8s.node.name": "test-node-1", - }, + m := GetMetrics(receivertest.NewNopCreateSettings(), n, []string{"Ready", "MemoryPressure"}, []string{"cpu", "memory", "ephemeral-storage", "storage"}) + expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) + require.NoError(t, err) + require.NoError(t, pmetrictest.CompareMetrics(expected, m, + pmetrictest.IgnoreTimestamp(), + pmetrictest.IgnoreStartTimestamp(), + pmetrictest.IgnoreResourceMetricsOrder(), + pmetrictest.IgnoreMetricsOrder(), + pmetrictest.IgnoreScopeMetricsOrder(), + ), ) - - testutils.AssertMetricsInt(t, actualResourceMetrics[0].Metrics[0], "k8s.node.condition_ready", - metricspb.MetricDescriptor_GAUGE_INT64, 1) - - testutils.AssertMetricsInt(t, actualResourceMetrics[0].Metrics[1], "k8s.node.condition_memory_pressure", - metricspb.MetricDescriptor_GAUGE_INT64, 0) - - testutils.AssertMetricsDouble(t, actualResourceMetrics[0].Metrics[2], "k8s.node.allocatable_cpu", - metricspb.MetricDescriptor_GAUGE_DOUBLE, 0.123) - - testutils.AssertMetricsInt(t, actualResourceMetrics[0].Metrics[3], "k8s.node.allocatable_memory", - metricspb.MetricDescriptor_GAUGE_INT64, 456) - - testutils.AssertMetricsInt(t, actualResourceMetrics[0].Metrics[4], "k8s.node.allocatable_ephemeral_storage", - metricspb.MetricDescriptor_GAUGE_INT64, 1234) -} - -func TestGetNodeConditionMetric(t *testing.T) { - tests := []struct { - name string - nodeConditionTypeValue string - want string - }{ - {"Metric for Node condition Ready", - "Ready", - "k8s.node.condition_ready", - }, - {"Metric for Node condition MemoryPressure", - "MemoryPressure", - "k8s.node.condition_memory_pressure", - }, - {"Metric for Node condition DiskPressure", - "DiskPressure", - "k8s.node.condition_disk_pressure", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := getNodeConditionMetric(tt.nodeConditionTypeValue); got != tt.want { - t.Errorf("getNodeConditionMetric() = %v, want %v", got, tt.want) - } - }) - } } func TestNodeConditionValue(t *testing.T) { diff --git a/receiver/k8sclusterreceiver/internal/node/testdata/expected.yaml b/receiver/k8sclusterreceiver/internal/node/testdata/expected.yaml new file mode 100644 index 000000000000..11e560137fbc --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/node/testdata/expected.yaml @@ -0,0 +1,48 @@ +resourceMetrics: + - resource: + attributes: + - key: k8s.node.name + value: + stringValue: test-node-1 + - key: k8s.node.uid + value: + stringValue: test-node-1-uid + - key: opencensus.resourcetype + value: + stringValue: k8s + schemaUrl: https://opentelemetry.io/schemas/1.18.0 + scopeMetrics: + - metrics: + - description: Whether this node is Ready (1), not Ready (0) or in an unknown state (-1) + gauge: + dataPoints: + - asInt: "1" + name: k8s.node.condition_ready + unit: "1" + - description: Whether this node is MemoryPressure (1), not MemoryPressure (0) or in an unknown state (-1) + gauge: + dataPoints: + - asInt: "0" + name: k8s.node.condition_memory_pressure + unit: "1" + - description: How many CPU cores remaining that the node can allocate to pods + gauge: + dataPoints: + - asDouble: 0.123 + name: k8s.node.allocatable_cpu + unit: "{cores}" + - description: How many bytes of RAM memory remaining that the node can allocate to pods + gauge: + dataPoints: + - asInt: "456" + name: k8s.node.allocatable_memory + unit: "By" + - description: How many bytes of ephemeral storage remaining that the node can allocate to pods + gauge: + dataPoints: + - asInt: "1234" + name: k8s.node.allocatable_ephemeral_storage + unit: "By" + scope: + name: otelcol/k8sclusterreceiver + version: latest \ No newline at end of file diff --git a/receiver/k8sclusterreceiver/testdata/e2e/expected.yaml b/receiver/k8sclusterreceiver/testdata/e2e/expected.yaml index d89fd2fc9ad8..369d80b13eb8 100644 --- a/receiver/k8sclusterreceiver/testdata/e2e/expected.yaml +++ b/receiver/k8sclusterreceiver/testdata/e2e/expected.yaml @@ -130,6 +130,7 @@ resourceMetrics: - key: opencensus.resourcetype value: stringValue: k8s + schemaUrl: "https://opentelemetry.io/schemas/1.18.0" scopeMetrics: - metrics: - description: Whether this node is Ready (1), not Ready (0) or in an unknown state (-1) @@ -138,7 +139,10 @@ resourceMetrics: - asInt: "1" timeUnixNano: "1686772769034865545" name: k8s.node.condition_ready - scope: {} + unit: "1" + scope: + name: otelcol/k8sclusterreceiver + version: latest - resource: attributes: - key: k8s.daemonset.name