From 61337102fc51ca447027380b50596966ba88b82b Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Sat, 6 Jan 2024 01:37:20 +0100 Subject: [PATCH] Fix the reference time rounding on Azure Metrics (#37365) (#37568) ### What Change the `MetricRegistry.NeedsUpdate()` method to decide whether to collect the metrics by comparing the collection interval with the time grain. If the time since the last collection < time grain duration, then the metrics skip the collection. For example, given the following scenario: #### Scenario A: collect PT1M metrics every 60s - time grain: PT1M (one minute, or 60s) - collection interval: 60s In this case, the time since the last collection is never shorter than the time grain, so the metricset fetch metric values on every collection. #### Scenario B: collect PT15M metrics every 60s - time grain: PT5M (five minutes, or 300s) - collection interval: 60s In this case, the time since the last collection is shorter (60s, 120s, 180s, 240s) than the time grain for four collections. The metricset fetch metric values every five collections. #### The jitter During our tests, we noticed the collection scheduling had some variations, causing the time since the last collection to be shorter than expected by a few milliseconds. To compensate for these scheduling fluctuations, the function also adds a short jitter duration (1 second) to avoid false positives due to small fluctuations in collection scheduling. ### Why During a testing session on 8.11.2, we [noticed](https://github.com/elastic/beats/issues/37204#issuecomment-1847023185) one out of four agents skipped some metrics collections. The debug logs revealed the metricset skipped collections due to a 1-second difference between the reference time for the current and previous collections (299s instead of 300s). ![CleanShot 2023-12-08 at 20 13 19](https://github.com/elastic/beats/assets/25941/dc3d5040-c89b-47d2-a86a-124eb838ca36) The 1-second difference may happen due to an inaccurate rounding in the reference time. For example, suppose the following two events occur: 1. Metricbeat calls `Fetch()` on the metricset a few milliseconds earlier than in the previous collection. 2. The timestamp is 2023-12-08T10:58:32.999Z. In this case, the reference time becomes 2023-12-08T10:58:32.000Z due to the truncation. This problem happened to one test agent. However, if it happens to one agent, it can happen to others. ### Extended Structured Logging We also added new fields to the debug structured logs: ```shell $ cat metricbeat.log.ndjson | grep "MetricRegistry" | head -n 1 | jq { "log.level": "debug", "@timestamp": "2024-01-05T15:03:12.235+0100", "log.logger": "azure monitor client", "log.origin": { "function": "github.com/elastic/beats/v7/x-pack/metricbeat/module/azure.(*MetricRegistry).NeedsUpdate", "file.name": "azure/metric_registry.go", "file.line": 80 }, "message": "MetricRegistry: Metric needs an update", "service.name": "metricbeat", "needs_update": true, "reference_time": "2024-01-05T14:03:07.197Z", "last_collection_time": "2024-01-05T14:02:07.199Z", "time_since_last_collection_seconds": 66.035681, "time_grain": "PT1M", "time_grain_duration_seconds": 60, "resource_id": "/subscriptions/123/resourceGroups/crest-test-lens-migration/providers/Microsoft.Compute/virtualMachines/rajvi-test-vm", "namespace": "Microsoft.Compute/virtualMachines", "aggregation": "Total", "names": "Network In,Network Out,Disk Read Bytes,Disk Write Bytes,Network In Total,Network Out Total", "ecs.version": "1.6.0" } ``` Here's an example using `jq`: ```shell $ cat metricbeat.log.ndjson | grep "MetricRegistry" | jq -r '[.namespace, .aggregation, .needs_update, .reference_time, .last_collection_time//"na", .time_since_last_collection_seconds//"na", .time_grain_duration_seconds//"na", .time_grain] | @tsv' | grep Microsoft.Compute/virtualMachines .aggregation aggregation .needs_update .reference_time .last_collection_time time_since_last_collection_seconds .time_grain_duration_seconds .time_grain Microsoft.Compute/virtualMachines Average true 2024-01-05T14:16:07.193Z 2024-01-05T14:15:07.193Z 60.999661 60 PT1M Microsoft.Compute/virtualMachines Total true 2024-01-05T14:16:07.193Z 2024-01-05T14:15:07.193Z 61.795341 60 PT1M Microsoft.Compute/virtualMachines Average true 2024-01-05T14:16:07.193Z 2024-01-05T14:15:07.193Z 62.080088 60 PT1M Microsoft.Compute/virtualMachines Total true 2024-01-05T14:16:07.193Z 2024-01-05T14:15:07.193Z 64.929579 60 PT1M Microsoft.Compute/virtualMachines Average true 2024-01-05T14:16:07.193Z 2024-01-05T14:15:07.193Z 65.632209 60 PT1M Microsoft.Compute/virtualMachines Total true 2024-01-05T14:16:07.193Z 2024-01-05T14:15:07.193Z 67.832918 60 PT1M Microsoft.Compute/virtualMachines Average true 2024-01-05T14:16:07.193Z 2024-01-05T14:15:07.193Z 68.576239 60 PT1M Microsoft.Compute/virtualMachines Total true 2024-01-05T14:16:07.193Z 2024-01-05T14:15:07.193Z 69.927988 60 PT1M Microsoft.Compute/virtualMachines Total true 2024-01-05T14:16:07.193Z 2024-01-05T14:15:07.193Z 70.351148 60 PT1M Microsoft.Compute/virtualMachines Average true 2024-01-05T14:16:07.193Z 2024-01-05T14:15:07.193Z 70.872058 60 PT1M Microsoft.Compute/virtualMachines Average true 2024-01-05T14:16:07.193Z 2024-01-05T14:15:07.193Z 72.47401 60 PT1M Microsoft.Compute/virtualMachines Total true 2024-01-05T14:16:07.193Z 2024-01-05T14:15:07.193Z 72.971242 60 PT1M Microsoft.Compute/virtualMachines Average true 2024-01-05T14:16:07.193Z 2024-01-05T14:15:07.193Z 73.143605 60 PT1M Microsoft.Compute/virtualMachines Total true 2024-01-05T14:16:07.193Z 2024-01-05T14:15:07.193Z 74.831489 60 PT1M ``` (cherry picked from commit 824dd04debeea12a34408bb5840609d1a57222a2) Co-authored-by: Maurizio Branca --- CHANGELOG.next.asciidoc | 1 + x-pack/metricbeat/module/azure/azure.go | 11 +- x-pack/metricbeat/module/azure/client.go | 104 --------------- .../metricbeat/module/azure/client_utils.go | 4 +- .../module/azure/metric_registry.go | 125 ++++++++++++++++++ .../module/azure/metric_registry_test.go | 93 +++++++++++++ x-pack/metricbeat/module/azure/resources.go | 2 +- 7 files changed, 230 insertions(+), 110 deletions(-) create mode 100644 x-pack/metricbeat/module/azure/metric_registry.go create mode 100644 x-pack/metricbeat/module/azure/metric_registry_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f1a5919087d0..6ecaf3350f5a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -102,6 +102,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix the "api-version query parameter (?api-version=) is required for all requests" error in Azure Billing. {pull}37158[37158] - Add memory hard limit from container metadata and remove usage percentage in AWS Fargate. {pull}37194[37194] - Ignore parser errors from unsupported metrics types on Prometheus client and continue parsing until EOF is reached {pull}37383[37383] +- Fix the reference time rounding on Azure Metrics {issue}37204[37204] {pull}37365[37365] *Osquerybeat* diff --git a/x-pack/metricbeat/module/azure/azure.go b/x-pack/metricbeat/module/azure/azure.go index 7812feed838c..dd7f121b2697 100644 --- a/x-pack/metricbeat/module/azure/azure.go +++ b/x-pack/metricbeat/module/azure/azure.go @@ -96,9 +96,14 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { // depending on metric time grain (check `MetricRegistry` // for more information). // - // We truncate the reference time to the second to avoid millisecond - // variations in the collection period causing skipped collections. - referenceTime := time.Now().UTC().Truncate(time.Second) + // We round the reference time to the nearest second to avoid + // millisecond variations in the collection period causing + // skipped collections. + // + // See "Round outer limits" and "Round inner limits" tests in + // the metric_registry_test.go for more information. + //referenceTime := time.Now().UTC().Round(time.Second) + referenceTime := time.Now().UTC() // Initialize cloud resources and monitor metrics // information. diff --git a/x-pack/metricbeat/module/azure/client.go b/x-pack/metricbeat/module/azure/client.go index ce9a6cb824fc..3b22a5713cd3 100644 --- a/x-pack/metricbeat/module/azure/client.go +++ b/x-pack/metricbeat/module/azure/client.go @@ -16,110 +16,6 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) -// NewMetricRegistry instantiates a new metric registry. -func NewMetricRegistry(logger *logp.Logger) *MetricRegistry { - return &MetricRegistry{ - logger: logger, - collectionsInfo: make(map[string]MetricCollectionInfo), - } -} - -// MetricRegistry keeps track of the last time a metric was collected and -// the time grain used. -// -// This is used to avoid collecting the same metric values over and over again -// when the time grain is larger than the collection interval. -type MetricRegistry struct { - logger *logp.Logger - collectionsInfo map[string]MetricCollectionInfo -} - -// Update updates the metric registry with the latest timestamp and -// time grain for the given metric. -func (m *MetricRegistry) Update(metric Metric, info MetricCollectionInfo) { - m.collectionsInfo[m.buildMetricKey(metric)] = info -} - -// NeedsUpdate returns true if the metric needs to be collected again -// for the given `referenceTime`. -func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) bool { - // Build a key to store the metric in the registry. - // The key is a combination of the namespace, - // resource ID and metric names. - metricKey := m.buildMetricKey(metric) - - // Get the now time in UTC, only to be used for logging. - // It's interesting to see when the registry evaluate each - // metric in relation to the reference time. - now := time.Now().UTC() - - if collection, exists := m.collectionsInfo[metricKey]; exists { - // Turn the time grain into a duration (for example, PT5M -> 5 minutes). - timeGrainDuration := convertTimeGrainToDuration(collection.timeGrain) - - // Calculate the start time of the time grain in relation to - // the reference time. - timeGrainStartTime := referenceTime.Add(-timeGrainDuration) - - // If the last collection time is after the start time of the time grain, - // it means that we already have a value for the given time grain. - // - // In this case, the metricset does not need to collect the metric - // values again. - if collection.timestamp.After(timeGrainStartTime) { - m.logger.Debugw( - "MetricRegistry: Metric does not need an update", - "needs_update", false, - "reference_time", referenceTime, - "now", now, - "time_grain_start_time", timeGrainStartTime, - "last_collection_at", collection.timestamp, - ) - - return false - } - - // The last collection time is before the start time of the time grain, - // it means that the metricset needs to collect the metric values again. - m.logger.Debugw( - "MetricRegistry: Metric needs an update", - "needs_update", true, - "reference_time", referenceTime, - "now", now, - "time_grain_start_time", timeGrainStartTime, - "last_collection_at", collection.timestamp, - ) - - return true - } - - // If the metric is not in the registry, it means that it has never - // been collected before. - // - // In this case, we need to collect the metric. - m.logger.Debugw( - "MetricRegistry: Metric needs an update", - "needs_update", true, - "reference_time", referenceTime, - "now", now, - ) - - return true -} - -// buildMetricKey builds a key for the metric registry. -// -// The key is a combination of the namespace, resource ID and metric names. -func (m *MetricRegistry) buildMetricKey(metric Metric) string { - keyComponents := []string{ - metric.Namespace, - metric.ResourceId, - } - keyComponents = append(keyComponents, metric.Names...) - - return strings.Join(keyComponents, ",") -} - // MetricCollectionInfo contains information about the last time // a metric was collected and the time grain used. type MetricCollectionInfo struct { diff --git a/x-pack/metricbeat/module/azure/client_utils.go b/x-pack/metricbeat/module/azure/client_utils.go index 986125ba6b68..114ccd95baf2 100644 --- a/x-pack/metricbeat/module/azure/client_utils.go +++ b/x-pack/metricbeat/module/azure/client_utils.go @@ -135,14 +135,14 @@ func compareMetricValues(metVal *float64, metricVal *float64) bool { return false } -// convertTimeGrainToDuration converts the Azure time grain options to the equivalent +// asDuration converts the Azure time grain options to the equivalent // `time.Duration` value. // // For example, converts "PT1M" to `time.Minute`. // // See https://docs.microsoft.com/en-us/azure/azure-monitor/platform/metrics-supported#time-grain // for more information. -func convertTimeGrainToDuration(timeGrain string) time.Duration { +func asDuration(timeGrain string) time.Duration { var duration time.Duration switch timeGrain { case "PT1M": diff --git a/x-pack/metricbeat/module/azure/metric_registry.go b/x-pack/metricbeat/module/azure/metric_registry.go new file mode 100644 index 000000000000..cdaa9496b5d6 --- /dev/null +++ b/x-pack/metricbeat/module/azure/metric_registry.go @@ -0,0 +1,125 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package azure + +import ( + "strings" + "time" + + "github.com/elastic/elastic-agent-libs/logp" +) + +// NewMetricRegistry instantiates a new metric registry. +func NewMetricRegistry(logger *logp.Logger) *MetricRegistry { + return &MetricRegistry{ + logger: logger, + collectionsInfo: make(map[string]MetricCollectionInfo), + jitter: 1 * time.Second, + } +} + +// MetricRegistry keeps track of the last time a metric was collected and +// the time grain used. +// +// This is used to avoid collecting the same metric values over and over again +// when the time grain is larger than the collection interval. +type MetricRegistry struct { + logger *logp.Logger + collectionsInfo map[string]MetricCollectionInfo + // The collection period can be jittered by a second. + // We introduce a small jitter to avoid skipping collections + // when the collection period is close (usually < 1s) to the + // time grain start time. + jitter time.Duration +} + +// Update updates the metric registry with the latest timestamp and +// time grain for the given metric. +func (m *MetricRegistry) Update(metric Metric, info MetricCollectionInfo) { + m.collectionsInfo[m.buildMetricKey(metric)] = info +} + +// NeedsUpdate returns true if the metric needs to be collected again +// for the given `referenceTime`. +func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) bool { + // Build a key to store the metric in the registry. + // The key is a combination of the namespace, + // resource ID and metric names. + metricKey := m.buildMetricKey(metric) + + if lastCollection, exists := m.collectionsInfo[metricKey]; exists { + // Turn the time grain into a duration (for example, PT5M -> 5 minutes). + timeGrainDuration := asDuration(lastCollection.timeGrain) + + // Adjust the last collection time by adding a small jitter to avoid + // skipping collections when the collection period is close (usually < 1s). + timeSinceLastCollection := time.Since(lastCollection.timestamp) + m.jitter + + if timeSinceLastCollection < timeGrainDuration { + m.logger.Debugw( + "MetricRegistry: Metric does not need an update", + "needs_update", false, + "reference_time", referenceTime, + "last_collection_time", lastCollection.timestamp, + "time_since_last_collection_seconds", timeSinceLastCollection.Seconds(), + "time_grain", lastCollection.timeGrain, + "time_grain_duration_seconds", timeGrainDuration.Seconds(), + "resource_id", metric.ResourceId, + "namespace", metric.Namespace, + "aggregation", metric.Aggregations, + "names", strings.Join(metric.Names, ","), + ) + + return false + } + + // The last collection time is before the start time of the time grain, + // it means that the metricset needs to collect the metric values again. + m.logger.Debugw( + "MetricRegistry: Metric needs an update", + "needs_update", true, + "reference_time", referenceTime, + "last_collection_time", lastCollection.timestamp, + "time_since_last_collection_seconds", timeSinceLastCollection.Seconds(), + "time_grain", lastCollection.timeGrain, + "time_grain_duration_seconds", timeGrainDuration.Seconds(), + "resource_id", metric.ResourceId, + "namespace", metric.Namespace, + "aggregation", metric.Aggregations, + "names", strings.Join(metric.Names, ","), + ) + + return true + } + + // If the metric is not in the registry, it means that it has never + // been collected before. + // + // In this case, we need to collect the metric. + m.logger.Debugw( + "MetricRegistry: Metric needs an update (no collection info in the metric registry)", + "needs_update", true, + "reference_time", referenceTime, + "resource_id", metric.ResourceId, + "namespace", metric.Namespace, + "aggregation", metric.Aggregations, + "names", strings.Join(metric.Names, ","), + ) + + return true +} + +// buildMetricKey builds a key for the metric registry. +// +// The key is a combination of the namespace, resource ID and metric names. +func (m *MetricRegistry) buildMetricKey(metric Metric) string { + keyComponents := []string{ + metric.Namespace, + metric.ResourceId, + } + keyComponents = append(keyComponents, metric.Names...) + + return strings.Join(keyComponents, ",") +} diff --git a/x-pack/metricbeat/module/azure/metric_registry_test.go b/x-pack/metricbeat/module/azure/metric_registry_test.go new file mode 100644 index 000000000000..a0ecdc84b85d --- /dev/null +++ b/x-pack/metricbeat/module/azure/metric_registry_test.go @@ -0,0 +1,93 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package azure + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/logp" +) + +func TestNewMetricRegistry(t *testing.T) { + logger := logp.NewLogger("test azure monitor") + + t.Run("Collect metrics with a regular 5 minutes period", func(t *testing.T) { + metricRegistry := NewMetricRegistry(logger) + + // Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time + lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T16:37:50.000Z") + + // Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time + referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T16:42:50.000Z") + + metric := Metric{ + ResourceId: "test", + Namespace: "test", + } + metricCollectionInfo := MetricCollectionInfo{ + timeGrain: "PT5M", + timestamp: lastCollectionAt, + } + + metricRegistry.Update(metric, metricCollectionInfo) + + needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric) + + assert.True(t, needsUpdate, "metric should need update") + }) + + t.Run("Collect metrics using a period 3 seconds longer than previous", func(t *testing.T) { + metricRegistry := NewMetricRegistry(logger) + + // Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time + lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T16:37:50.000Z") + + // Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time + referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T16:42:53.000Z") + + metric := Metric{ + ResourceId: "test", + Namespace: "test", + } + metricCollectionInfo := MetricCollectionInfo{ + timeGrain: "PT5M", + timestamp: lastCollectionAt, + } + + metricRegistry.Update(metric, metricCollectionInfo) + + needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric) + + assert.True(t, needsUpdate, "metric should need update") + }) + + t.Run("Collect metrics using a period (1 second) shorter than previous", func(t *testing.T) { + metricRegistry := NewMetricRegistry(logger) + + // Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time + referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.000Z") + + // Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time + lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T10:53:34.000Z") + + metric := Metric{ + ResourceId: "test", + Namespace: "test", + } + metricCollectionInfo := MetricCollectionInfo{ + timeGrain: "PT5M", + timestamp: lastCollectionAt, + } + + metricRegistry.Update(metric, metricCollectionInfo) + + needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric) + + assert.True(t, needsUpdate, "metric should not need update") + }) +} diff --git a/x-pack/metricbeat/module/azure/resources.go b/x-pack/metricbeat/module/azure/resources.go index 0a723c82bd5a..6a633663cb1c 100644 --- a/x-pack/metricbeat/module/azure/resources.go +++ b/x-pack/metricbeat/module/azure/resources.go @@ -38,7 +38,7 @@ type Metric struct { Values []MetricValue TimeGrain string ResourceId string - // ResourceSubId is used for the metric values api as namespaces can apply to sub resrouces ex. storage account: container, blob, vm scaleset: vms + // ResourceSubId is used for the metric values api as namespaces can apply to sub resources ex. storage account: container, blob, vm scaleset: vms ResourceSubId string }