Skip to content

Commit

Permalink
Fix the reference time rounding on Azure Metrics (#37365)
Browse files Browse the repository at this point in the history
### What

Change the `MetricRegistry.NeedsUpdate()` method to decide whether to collect the metrics by comparing the collection interval with the time grain. 

If the time since the last collection < time grain duration, then the metrics skip the collection.

For example, given the following scenario:

#### Scenario A: collect PT1M metrics every 60s

- time grain: PT1M (one minute, or 60s)
- collection interval: 60s

In this case, the time since the last collection is never shorter than the time grain, so the metricset fetch metric values on every collection.

#### Scenario B: collect PT15M metrics every 60s

- time grain: PT5M (five minutes, or 300s)
- collection interval: 60s

In this case, the time since the last collection is shorter (60s, 120s, 180s, 240s) than the time grain for four collections. The metricset fetch metric values every five collections.

#### The jitter

During our tests, we noticed the collection scheduling had some variations, causing the time since the last collection to be shorter than expected by a few milliseconds. To compensate for these scheduling fluctuations, the function also adds a short jitter duration (1 second) to avoid false positives due to small fluctuations in collection scheduling. 

### Why

During a testing session on 8.11.2, we [noticed](#37204 (comment)) one out of four agents skipped some metrics collections.

The debug logs revealed the metricset skipped collections due to a 1-second difference between the reference time for the current and previous collections (299s instead of 300s).

![CleanShot 2023-12-08 at 20 13 19](https://github.com/elastic/beats/assets/25941/dc3d5040-c89b-47d2-a86a-124eb838ca36)

The 1-second difference may happen due to an inaccurate rounding in the reference time.

For example, suppose the following two events occur:

1. Metricbeat calls `Fetch()` on the metricset a few milliseconds earlier than in the previous collection.
2. The timestamp is 2023-12-08T10:58:32.999Z. 

In this case, the reference time becomes 2023-12-08T10:58:32.000Z due to the truncation.

This problem happened to one test agent. However, if it happens to one agent, it can happen to others.

### Extended Structured Logging

We also added new fields to the debug structured logs:

```shell
$ cat metricbeat.log.ndjson | grep "MetricRegistry" | head -n 1 | jq                                                                                                                                                    
{
  "log.level": "debug",
  "@timestamp": "2024-01-05T15:03:12.235+0100",
  "log.logger": "azure monitor client",
  "log.origin": {
    "function": "github.com/elastic/beats/v7/x-pack/metricbeat/module/azure.(*MetricRegistry).NeedsUpdate",
    "file.name": "azure/metric_registry.go",
    "file.line": 80
  },
  "message": "MetricRegistry: Metric needs an update",
  "service.name": "metricbeat",
  "needs_update": true,
  "reference_time": "2024-01-05T14:03:07.197Z",
  "last_collection_time": "2024-01-05T14:02:07.199Z",
  "time_since_last_collection_seconds": 66.035681,
  "time_grain": "PT1M",
  "time_grain_duration_seconds": 60,
  "resource_id": "/subscriptions/123/resourceGroups/crest-test-lens-migration/providers/Microsoft.Compute/virtualMachines/rajvi-test-vm",
  "namespace": "Microsoft.Compute/virtualMachines",
  "aggregation": "Total",
  "names": "Network In,Network Out,Disk Read Bytes,Disk Write Bytes,Network In Total,Network Out Total",
  "ecs.version": "1.6.0"
}
```

Here's an example using `jq`:

```shell
$ cat metricbeat.log.ndjson | grep "MetricRegistry" | jq -r  '[.namespace, .aggregation, .needs_update, .reference_time, .last_collection_time//"na", .time_since_last_collection_seconds//"na", .time_grain_duration_seconds//"na", .time_grain] | @TSV' | grep Microsoft.Compute/virtualMachines

.aggregation                            aggregation   .needs_update   .reference_time                 .last_collection_time           time_since_last_collection_seconds .time_grain_duration_seconds .time_grain
Microsoft.Compute/virtualMachines       Average       true            2024-01-05T14:16:07.193Z        2024-01-05T14:15:07.193Z        60.999661                          60                           PT1M
Microsoft.Compute/virtualMachines       Total         true            2024-01-05T14:16:07.193Z        2024-01-05T14:15:07.193Z        61.795341                          60                           PT1M
Microsoft.Compute/virtualMachines       Average       true            2024-01-05T14:16:07.193Z        2024-01-05T14:15:07.193Z        62.080088                          60                           PT1M
Microsoft.Compute/virtualMachines       Total         true            2024-01-05T14:16:07.193Z        2024-01-05T14:15:07.193Z        64.929579                          60                           PT1M
Microsoft.Compute/virtualMachines       Average       true            2024-01-05T14:16:07.193Z        2024-01-05T14:15:07.193Z        65.632209                          60                           PT1M
Microsoft.Compute/virtualMachines       Total         true            2024-01-05T14:16:07.193Z        2024-01-05T14:15:07.193Z        67.832918                          60                           PT1M
Microsoft.Compute/virtualMachines       Average       true            2024-01-05T14:16:07.193Z        2024-01-05T14:15:07.193Z        68.576239                          60                           PT1M
Microsoft.Compute/virtualMachines       Total         true            2024-01-05T14:16:07.193Z        2024-01-05T14:15:07.193Z        69.927988                          60                           PT1M
Microsoft.Compute/virtualMachines       Total         true            2024-01-05T14:16:07.193Z        2024-01-05T14:15:07.193Z        70.351148                          60                           PT1M
Microsoft.Compute/virtualMachines       Average       true            2024-01-05T14:16:07.193Z        2024-01-05T14:15:07.193Z        70.872058                          60                           PT1M
Microsoft.Compute/virtualMachines       Average       true            2024-01-05T14:16:07.193Z        2024-01-05T14:15:07.193Z        72.47401                           60                           PT1M
Microsoft.Compute/virtualMachines       Total         true            2024-01-05T14:16:07.193Z        2024-01-05T14:15:07.193Z        72.971242                          60                           PT1M
Microsoft.Compute/virtualMachines       Average       true            2024-01-05T14:16:07.193Z        2024-01-05T14:15:07.193Z        73.143605                          60                           PT1M
Microsoft.Compute/virtualMachines       Total         true            2024-01-05T14:16:07.193Z        2024-01-05T14:15:07.193Z        74.831489                          60                           PT1M
```
  • Loading branch information
zmoog authored Jan 5, 2024
1 parent d1b3277 commit 824dd04
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 110 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix the "api-version query parameter (?api-version=) is required for all requests" error in Azure Billing. {pull}37158[37158]
- Add memory hard limit from container metadata and remove usage percentage in AWS Fargate. {pull}37194[37194]
- Ignore parser errors from unsupported metrics types on Prometheus client and continue parsing until EOF is reached {pull}37383[37383]
- Fix the reference time rounding on Azure Metrics {issue}37204[37204] {pull}37365[37365]

*Osquerybeat*

Expand Down
11 changes: 8 additions & 3 deletions x-pack/metricbeat/module/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,14 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
// depending on metric time grain (check `MetricRegistry`
// for more information).
//
// We truncate the reference time to the second to avoid millisecond
// variations in the collection period causing skipped collections.
referenceTime := time.Now().UTC().Truncate(time.Second)
// We round the reference time to the nearest second to avoid
// millisecond variations in the collection period causing
// skipped collections.
//
// See "Round outer limits" and "Round inner limits" tests in
// the metric_registry_test.go for more information.
//referenceTime := time.Now().UTC().Round(time.Second)
referenceTime := time.Now().UTC()

// Initialize cloud resources and monitor metrics
// information.
Expand Down
104 changes: 0 additions & 104 deletions x-pack/metricbeat/module/azure/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,110 +16,6 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
)

// NewMetricRegistry instantiates a new metric registry.
func NewMetricRegistry(logger *logp.Logger) *MetricRegistry {
return &MetricRegistry{
logger: logger,
collectionsInfo: make(map[string]MetricCollectionInfo),
}
}

// MetricRegistry keeps track of the last time a metric was collected and
// the time grain used.
//
// This is used to avoid collecting the same metric values over and over again
// when the time grain is larger than the collection interval.
type MetricRegistry struct {
logger *logp.Logger
collectionsInfo map[string]MetricCollectionInfo
}

// Update updates the metric registry with the latest timestamp and
// time grain for the given metric.
func (m *MetricRegistry) Update(metric Metric, info MetricCollectionInfo) {
m.collectionsInfo[m.buildMetricKey(metric)] = info
}

// NeedsUpdate returns true if the metric needs to be collected again
// for the given `referenceTime`.
func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) bool {
// Build a key to store the metric in the registry.
// The key is a combination of the namespace,
// resource ID and metric names.
metricKey := m.buildMetricKey(metric)

// Get the now time in UTC, only to be used for logging.
// It's interesting to see when the registry evaluate each
// metric in relation to the reference time.
now := time.Now().UTC()

if collection, exists := m.collectionsInfo[metricKey]; exists {
// Turn the time grain into a duration (for example, PT5M -> 5 minutes).
timeGrainDuration := convertTimeGrainToDuration(collection.timeGrain)

// Calculate the start time of the time grain in relation to
// the reference time.
timeGrainStartTime := referenceTime.Add(-timeGrainDuration)

// If the last collection time is after the start time of the time grain,
// it means that we already have a value for the given time grain.
//
// In this case, the metricset does not need to collect the metric
// values again.
if collection.timestamp.After(timeGrainStartTime) {
m.logger.Debugw(
"MetricRegistry: Metric does not need an update",
"needs_update", false,
"reference_time", referenceTime,
"now", now,
"time_grain_start_time", timeGrainStartTime,
"last_collection_at", collection.timestamp,
)

return false
}

// The last collection time is before the start time of the time grain,
// it means that the metricset needs to collect the metric values again.
m.logger.Debugw(
"MetricRegistry: Metric needs an update",
"needs_update", true,
"reference_time", referenceTime,
"now", now,
"time_grain_start_time", timeGrainStartTime,
"last_collection_at", collection.timestamp,
)

return true
}

// If the metric is not in the registry, it means that it has never
// been collected before.
//
// In this case, we need to collect the metric.
m.logger.Debugw(
"MetricRegistry: Metric needs an update",
"needs_update", true,
"reference_time", referenceTime,
"now", now,
)

return true
}

// buildMetricKey builds a key for the metric registry.
//
// The key is a combination of the namespace, resource ID and metric names.
func (m *MetricRegistry) buildMetricKey(metric Metric) string {
keyComponents := []string{
metric.Namespace,
metric.ResourceId,
}
keyComponents = append(keyComponents, metric.Names...)

return strings.Join(keyComponents, ",")
}

// MetricCollectionInfo contains information about the last time
// a metric was collected and the time grain used.
type MetricCollectionInfo struct {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/azure/client_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ func compareMetricValues(metVal *float64, metricVal *float64) bool {
return false
}

// convertTimeGrainToDuration converts the Azure time grain options to the equivalent
// asDuration converts the Azure time grain options to the equivalent
// `time.Duration` value.
//
// For example, converts "PT1M" to `time.Minute`.
//
// See https://docs.microsoft.com/en-us/azure/azure-monitor/platform/metrics-supported#time-grain
// for more information.
func convertTimeGrainToDuration(timeGrain string) time.Duration {
func asDuration(timeGrain string) time.Duration {
var duration time.Duration
switch timeGrain {
case "PT1M":
Expand Down
125 changes: 125 additions & 0 deletions x-pack/metricbeat/module/azure/metric_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package azure

import (
"strings"
"time"

"github.com/elastic/elastic-agent-libs/logp"
)

// NewMetricRegistry instantiates a new metric registry.
func NewMetricRegistry(logger *logp.Logger) *MetricRegistry {
return &MetricRegistry{
logger: logger,
collectionsInfo: make(map[string]MetricCollectionInfo),
jitter: 1 * time.Second,
}
}

// MetricRegistry keeps track of the last time a metric was collected and
// the time grain used.
//
// This is used to avoid collecting the same metric values over and over again
// when the time grain is larger than the collection interval.
type MetricRegistry struct {
logger *logp.Logger
collectionsInfo map[string]MetricCollectionInfo
// The collection period can be jittered by a second.
// We introduce a small jitter to avoid skipping collections
// when the collection period is close (usually < 1s) to the
// time grain start time.
jitter time.Duration
}

// Update updates the metric registry with the latest timestamp and
// time grain for the given metric.
func (m *MetricRegistry) Update(metric Metric, info MetricCollectionInfo) {
m.collectionsInfo[m.buildMetricKey(metric)] = info
}

// NeedsUpdate returns true if the metric needs to be collected again
// for the given `referenceTime`.
func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) bool {
// Build a key to store the metric in the registry.
// The key is a combination of the namespace,
// resource ID and metric names.
metricKey := m.buildMetricKey(metric)

if lastCollection, exists := m.collectionsInfo[metricKey]; exists {
// Turn the time grain into a duration (for example, PT5M -> 5 minutes).
timeGrainDuration := asDuration(lastCollection.timeGrain)

// Adjust the last collection time by adding a small jitter to avoid
// skipping collections when the collection period is close (usually < 1s).
timeSinceLastCollection := time.Since(lastCollection.timestamp) + m.jitter

if timeSinceLastCollection < timeGrainDuration {
m.logger.Debugw(
"MetricRegistry: Metric does not need an update",
"needs_update", false,
"reference_time", referenceTime,
"last_collection_time", lastCollection.timestamp,
"time_since_last_collection_seconds", timeSinceLastCollection.Seconds(),
"time_grain", lastCollection.timeGrain,
"time_grain_duration_seconds", timeGrainDuration.Seconds(),
"resource_id", metric.ResourceId,
"namespace", metric.Namespace,
"aggregation", metric.Aggregations,
"names", strings.Join(metric.Names, ","),
)

return false
}

// The last collection time is before the start time of the time grain,
// it means that the metricset needs to collect the metric values again.
m.logger.Debugw(
"MetricRegistry: Metric needs an update",
"needs_update", true,
"reference_time", referenceTime,
"last_collection_time", lastCollection.timestamp,
"time_since_last_collection_seconds", timeSinceLastCollection.Seconds(),
"time_grain", lastCollection.timeGrain,
"time_grain_duration_seconds", timeGrainDuration.Seconds(),
"resource_id", metric.ResourceId,
"namespace", metric.Namespace,
"aggregation", metric.Aggregations,
"names", strings.Join(metric.Names, ","),
)

return true
}

// If the metric is not in the registry, it means that it has never
// been collected before.
//
// In this case, we need to collect the metric.
m.logger.Debugw(
"MetricRegistry: Metric needs an update (no collection info in the metric registry)",
"needs_update", true,
"reference_time", referenceTime,
"resource_id", metric.ResourceId,
"namespace", metric.Namespace,
"aggregation", metric.Aggregations,
"names", strings.Join(metric.Names, ","),
)

return true
}

// buildMetricKey builds a key for the metric registry.
//
// The key is a combination of the namespace, resource ID and metric names.
func (m *MetricRegistry) buildMetricKey(metric Metric) string {
keyComponents := []string{
metric.Namespace,
metric.ResourceId,
}
keyComponents = append(keyComponents, metric.Names...)

return strings.Join(keyComponents, ",")
}
93 changes: 93 additions & 0 deletions x-pack/metricbeat/module/azure/metric_registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package azure

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"
)

func TestNewMetricRegistry(t *testing.T) {
logger := logp.NewLogger("test azure monitor")

t.Run("Collect metrics with a regular 5 minutes period", func(t *testing.T) {
metricRegistry := NewMetricRegistry(logger)

// Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time
lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T16:37:50.000Z")

// Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time
referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T16:42:50.000Z")

metric := Metric{
ResourceId: "test",
Namespace: "test",
}
metricCollectionInfo := MetricCollectionInfo{
timeGrain: "PT5M",
timestamp: lastCollectionAt,
}

metricRegistry.Update(metric, metricCollectionInfo)

needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric)

assert.True(t, needsUpdate, "metric should need update")
})

t.Run("Collect metrics using a period 3 seconds longer than previous", func(t *testing.T) {
metricRegistry := NewMetricRegistry(logger)

// Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time
lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T16:37:50.000Z")

// Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time
referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T16:42:53.000Z")

metric := Metric{
ResourceId: "test",
Namespace: "test",
}
metricCollectionInfo := MetricCollectionInfo{
timeGrain: "PT5M",
timestamp: lastCollectionAt,
}

metricRegistry.Update(metric, metricCollectionInfo)

needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric)

assert.True(t, needsUpdate, "metric should need update")
})

t.Run("Collect metrics using a period (1 second) shorter than previous", func(t *testing.T) {
metricRegistry := NewMetricRegistry(logger)

// Create a referenceTime parsing 2023-12-08T16:42:50.000Z into a time.Time
referenceTime, _ := time.Parse(time.RFC3339, "2023-12-08T10:58:33.000Z")

// Create a lastCollectionAt parsing the string 2023-12-08T16:37:50.000Z into a time.Time
lastCollectionAt, _ := time.Parse(time.RFC3339, "2023-12-08T10:53:34.000Z")

metric := Metric{
ResourceId: "test",
Namespace: "test",
}
metricCollectionInfo := MetricCollectionInfo{
timeGrain: "PT5M",
timestamp: lastCollectionAt,
}

metricRegistry.Update(metric, metricCollectionInfo)

needsUpdate := metricRegistry.NeedsUpdate(referenceTime, metric)

assert.True(t, needsUpdate, "metric should not need update")
})
}
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/azure/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Metric struct {
Values []MetricValue
TimeGrain string
ResourceId string
// ResourceSubId is used for the metric values api as namespaces can apply to sub resrouces ex. storage account: container, blob, vm scaleset: vms
// ResourceSubId is used for the metric values api as namespaces can apply to sub resources ex. storage account: container, blob, vm scaleset: vms
ResourceSubId string
}

Expand Down

0 comments on commit 824dd04

Please sign in to comment.