diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f1a5919087d0..6ecaf3350f5a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -102,6 +102,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix the "api-version query parameter (?api-version=) is required for all requests" error in Azure Billing. {pull}37158[37158] - Add memory hard limit from container metadata and remove usage percentage in AWS Fargate. {pull}37194[37194] - Ignore parser errors from unsupported metrics types on Prometheus client and continue parsing until EOF is reached {pull}37383[37383] +- Fix the reference time rounding on Azure Metrics {issue}37204[37204] {pull}37365[37365] *Osquerybeat* diff --git a/x-pack/metricbeat/module/azure/azure.go b/x-pack/metricbeat/module/azure/azure.go index 7812feed838c..dd7f121b2697 100644 --- a/x-pack/metricbeat/module/azure/azure.go +++ b/x-pack/metricbeat/module/azure/azure.go @@ -96,9 +96,14 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { // depending on metric time grain (check `MetricRegistry` // for more information). // - // We truncate the reference time to the second to avoid millisecond - // variations in the collection period causing skipped collections. - referenceTime := time.Now().UTC().Truncate(time.Second) + // We round the reference time to the nearest second to avoid + // millisecond variations in the collection period causing + // skipped collections. + // + // See "Round outer limits" and "Round inner limits" tests in + // the metric_registry_test.go for more information. + //referenceTime := time.Now().UTC().Round(time.Second) + referenceTime := time.Now().UTC() // Initialize cloud resources and monitor metrics // information. diff --git a/x-pack/metricbeat/module/azure/client.go b/x-pack/metricbeat/module/azure/client.go index ce9a6cb824fc..3b22a5713cd3 100644 --- a/x-pack/metricbeat/module/azure/client.go +++ b/x-pack/metricbeat/module/azure/client.go @@ -16,110 +16,6 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) -// NewMetricRegistry instantiates a new metric registry. -func NewMetricRegistry(logger *logp.Logger) *MetricRegistry { - return &MetricRegistry{ - logger: logger, - collectionsInfo: make(map[string]MetricCollectionInfo), - } -} - -// MetricRegistry keeps track of the last time a metric was collected and -// the time grain used. -// -// This is used to avoid collecting the same metric values over and over again -// when the time grain is larger than the collection interval. -type MetricRegistry struct { - logger *logp.Logger - collectionsInfo map[string]MetricCollectionInfo -} - -// Update updates the metric registry with the latest timestamp and -// time grain for the given metric. -func (m *MetricRegistry) Update(metric Metric, info MetricCollectionInfo) { - m.collectionsInfo[m.buildMetricKey(metric)] = info -} - -// NeedsUpdate returns true if the metric needs to be collected again -// for the given `referenceTime`. -func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) bool { - // Build a key to store the metric in the registry. - // The key is a combination of the namespace, - // resource ID and metric names. - metricKey := m.buildMetricKey(metric) - - // Get the now time in UTC, only to be used for logging. - // It's interesting to see when the registry evaluate each - // metric in relation to the reference time. - now := time.Now().UTC() - - if collection, exists := m.collectionsInfo[metricKey]; exists { - // Turn the time grain into a duration (for example, PT5M -> 5 minutes). - timeGrainDuration := convertTimeGrainToDuration(collection.timeGrain) - - // Calculate the start time of the time grain in relation to - // the reference time. - timeGrainStartTime := referenceTime.Add(-timeGrainDuration) - - // If the last collection time is after the start time of the time grain, - // it means that we already have a value for the given time grain. - // - // In this case, the metricset does not need to collect the metric - // values again. - if collection.timestamp.After(timeGrainStartTime) { - m.logger.Debugw( - "MetricRegistry: Metric does not need an update", - "needs_update", false, - "reference_time", referenceTime, - "now", now, - "time_grain_start_time", timeGrainStartTime, - "last_collection_at", collection.timestamp, - ) - - return false - } - - // The last collection time is before the start time of the time grain, - // it means that the metricset needs to collect the metric values again. - m.logger.Debugw( - "MetricRegistry: Metric needs an update", - "needs_update", true, - "reference_time", referenceTime, - "now", now, - "time_grain_start_time", timeGrainStartTime, - "last_collection_at", collection.timestamp, - ) - - return true - } - - // If the metric is not in the registry, it means that it has never - // been collected before. - // - // In this case, we need to collect the metric. - m.logger.Debugw( - "MetricRegistry: Metric needs an update", - "needs_update", true, - "reference_time", referenceTime, - "now", now, - ) - - return true -} - -// buildMetricKey builds a key for the metric registry. -// -// The key is a combination of the namespace, resource ID and metric names. -func (m *MetricRegistry) buildMetricKey(metric Metric) string { - keyComponents := []string{ - metric.Namespace, - metric.ResourceId, - } - keyComponents = append(keyComponents, metric.Names...) - - return strings.Join(keyComponents, ",") -} - // MetricCollectionInfo contains information about the last time // a metric was collected and the time grain used. type MetricCollectionInfo struct { diff --git a/x-pack/metricbeat/module/azure/client_utils.go b/x-pack/metricbeat/module/azure/client_utils.go index 986125ba6b68..114ccd95baf2 100644 --- a/x-pack/metricbeat/module/azure/client_utils.go +++ b/x-pack/metricbeat/module/azure/client_utils.go @@ -135,14 +135,14 @@ func compareMetricValues(metVal *float64, metricVal *float64) bool { return false } -// convertTimeGrainToDuration converts the Azure time grain options to the equivalent +// asDuration converts the Azure time grain options to the equivalent // `time.Duration` value. // // For example, converts "PT1M" to `time.Minute`. // // See https://docs.microsoft.com/en-us/azure/azure-monitor/platform/metrics-supported#time-grain // for more information. -func convertTimeGrainToDuration(timeGrain string) time.Duration { +func asDuration(timeGrain string) time.Duration { var duration time.Duration switch timeGrain { case "PT1M": diff --git a/x-pack/metricbeat/module/azure/metric_registry.go b/x-pack/metricbeat/module/azure/metric_registry.go new file mode 100644 index 000000000000..cdaa9496b5d6 --- /dev/null +++ b/x-pack/metricbeat/module/azure/metric_registry.go @@ -0,0 +1,125 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package azure + +import ( + "strings" + "time" + + "github.com/elastic/elastic-agent-libs/logp" +) + +// NewMetricRegistry instantiates a new metric registry. +func NewMetricRegistry(logger *logp.Logger) *MetricRegistry { + return &MetricRegistry{ + logger: logger, + collectionsInfo: make(map[string]MetricCollectionInfo), + jitter: 1 * time.Second, + } +} + +// MetricRegistry keeps track of the last time a metric was collected and +// the time grain used. +// +// This is used to avoid collecting the same metric values over and over again +// when the time grain is larger than the collection interval. +type MetricRegistry struct { + logger *logp.Logger + collectionsInfo map[string]MetricCollectionInfo + // The collection period can be jittered by a second. + // We introduce a small jitter to avoid skipping collections + // when the collection period is close (usually < 1s) to the + // time grain start time. + jitter time.Duration +} + +// Update updates the metric registry with the latest timestamp and +// time grain for the given metric. +func (m *MetricRegistry) Update(metric Metric, info MetricCollectionInfo) { + m.collectionsInfo[m.buildMetricKey(metric)] = info +} + +// NeedsUpdate returns true if the metric needs to be collected again +// for the given `referenceTime`. +func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) bool { + // Build a key to store the metric in the registry. + // The key is a combination of the namespace, + // resource ID and metric names. + metricKey := m.buildMetricKey(metric) + + if lastCollection, exists := m.collectionsInfo[metricKey]; exists { + // Turn the time grain into a duration (for example, PT5M -> 5 minutes). + timeGrainDuration := asDuration(lastCollection.timeGrain) + + // Adjust the last collection time by adding a small jitter to avoid + // skipping collections when the collection period is close (usually < 1s). + timeSinceLastCollection := time.Since(lastCollection.timestamp) + m.jitter + + if timeSinceLastCollection < timeGrainDuration { + m.logger.Debugw( + "MetricRegistry: Metric does not need an update", + "needs_update", false, + "reference_time", referenceTime, + "last_collection_time", lastCollection.timestamp, + "time_since_last_collection_seconds", timeSinceLastCollection.Seconds(), + "time_grain", lastCollection.timeGrain, + "time_grain_duration_seconds", timeGrainDuration.Seconds(), + "resource_id", metric.ResourceId, + "namespace", metric.Namespace, + "aggregation", metric.Aggregations, + "names", strings.Join(metric.Names, ","), + ) + + return false + } + + // The last collection time is before the start time of the time grain, + // it means that the metricset needs to collect the metric values again. + m.logger.Debugw( + "MetricRegistry: Metric needs an update", + "needs_update", true, + "reference_time", referenceTime, + "last_collection_time", lastCollection.timestamp, + "time_since_last_collection_seconds", timeSinceLastCollection.Seconds(), + "time_grain", lastCollection.timeGrain, + "time_grain_duration_seconds", timeGrainDuration.Seconds(), + "resource_id", metric.ResourceId, + "namespace", metric.Namespace, + "aggregation", metric.Aggregations, + "names", strings.Join(metric.Names, ","), + ) + + return true + } + + // If the metric is not in the registry, it means that it has never + // been collected before. + // + // In this case, we need to collect the metric. + m.logger.Debugw( + "MetricRegistry: Metric needs an update (no collection info in the metric registry)", + "needs_update", true, + "reference_time", referenceTime, + "resource_id", metric.ResourceId, + "namespace", metric.Namespace, + "aggregation", metric.Aggregations, + "names", strings.Join(metric.Names, ","), + ) + + return true +} + +// buildMetricKey builds a key for the metric registry. +// +// The key is a combination of the namespace, resource ID and metric names. +func (m *MetricRegistry) buildMetricKey(metric Metric) string { + keyComponents := []string{ + metric.Namespace, + metric.ResourceId, + } + keyComponents = append(keyComponents, metric.Names...) + + return strings.Join(keyComponents, ",") +} diff --git a/x-pack/metricbeat/module/azure/metric_registry_test.go b/x-pack/metricbeat/module/azure/metric_registry_test.go new file mode 100644 index 000000000000..a0ecdc84b85d --- /dev/null +++ b/x-pack/metricbeat/module/azure/metric_registry_test.go @@ -0,0 +1,93 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package azure + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/logp" +) + +func TestNewMetricRegistry(t *testing.T) { + logger := logp.NewLogger("test azure monitor") + + t.Run("Collect metrics with a regular 5 minutes period", func(t *testing.T) { + metricRegistry := NewMetricRegistry(logger) + + // Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time + lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T16:37:50.000Z") + + // Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time + referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T16:42:50.000Z") + + metric := Metric{ + ResourceId: "test", + Namespace: "test", + } + metricCollectionInfo := MetricCollectionInfo{ + timeGrain: "PT5M", + timestamp: lastCollectionAt, + } + + metricRegistry.Update(metric, metricCollectionInfo) + + needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric) + + assert.True(t, needsUpdate, "metric should need update") + }) + + t.Run("Collect metrics using a period 3 seconds longer than previous", func(t *testing.T) { + metricRegistry := NewMetricRegistry(logger) + + // Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time + lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T16:37:50.000Z") + + // Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time + referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T16:42:53.000Z") + + metric := Metric{ + ResourceId: "test", + Namespace: "test", + } + metricCollectionInfo := MetricCollectionInfo{ + timeGrain: "PT5M", + timestamp: lastCollectionAt, + } + + metricRegistry.Update(metric, metricCollectionInfo) + + needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric) + + assert.True(t, needsUpdate, "metric should need update") + }) + + t.Run("Collect metrics using a period (1 second) shorter than previous", func(t *testing.T) { + metricRegistry := NewMetricRegistry(logger) + + // Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time + referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.000Z") + + // Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time + lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T10:53:34.000Z") + + metric := Metric{ + ResourceId: "test", + Namespace: "test", + } + metricCollectionInfo := MetricCollectionInfo{ + timeGrain: "PT5M", + timestamp: lastCollectionAt, + } + + metricRegistry.Update(metric, metricCollectionInfo) + + needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric) + + assert.True(t, needsUpdate, "metric should not need update") + }) +} diff --git a/x-pack/metricbeat/module/azure/resources.go b/x-pack/metricbeat/module/azure/resources.go index 0a723c82bd5a..6a633663cb1c 100644 --- a/x-pack/metricbeat/module/azure/resources.go +++ b/x-pack/metricbeat/module/azure/resources.go @@ -38,7 +38,7 @@ type Metric struct { Values []MetricValue TimeGrain string ResourceId string - // ResourceSubId is used for the metric values api as namespaces can apply to sub resrouces ex. storage account: container, blob, vm scaleset: vms + // ResourceSubId is used for the metric values api as namespaces can apply to sub resources ex. storage account: container, blob, vm scaleset: vms ResourceSubId string }