Skip to content

Commit

Permalink
Add a self pruning layer to gcp exporter delta stores (#917)
Browse files Browse the repository at this point in the history
* Add a self pruning layer to gcp exporter delta stores

* Refactor pruning window implementation and tests to catch timing bugs
  • Loading branch information
kgeckhart authored Jun 3, 2024
1 parent 0c51b19 commit 42cb22e
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ Main (unreleased)

- Added support for NS records to `discovery.dns`. (@djcode)

- Improved clustering use cases for tracking GCP delta metrics in the `prometheus.exporter.gcp` (@kgeckhart)

### Bugfixes

- Fixed an issue with `prometheus.scrape` in which targets that move from one
Expand Down
33 changes: 30 additions & 3 deletions internal/static/integrations/gcp_exporter/gcp_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"google.golang.org/api/option"
"gopkg.in/yaml.v2"

"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/static/integrations"
integrations_v2 "github.com/grafana/alloy/internal/static/integrations/v2"
"github.com/grafana/alloy/internal/static/integrations/v2/metricsutils"
Expand Down Expand Up @@ -85,7 +86,11 @@ func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error)
}

var gcpCollectors []prometheus.Collector
var counterStores []*SelfPruningDeltaStore[collectors.ConstMetric]
var histogramStores []*SelfPruningDeltaStore[collectors.HistogramMetric]
for _, projectID := range c.ProjectIDs {
counterStore := NewSelfPruningDeltaStore[collectors.ConstMetric](l, delta.NewInMemoryCounterStore(l, 30*time.Minute))
histogramStore := NewSelfPruningDeltaStore[collectors.HistogramMetric](l, delta.NewInMemoryHistogramStore(l, 30*time.Minute))
monitoringCollector, err := collectors.NewMonitoringCollector(
projectID,
svc,
Expand All @@ -106,17 +111,39 @@ func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error)
AggregateDeltas: true,
},
l,
delta.NewInMemoryCounterStore(l, 30*time.Minute),
delta.NewInMemoryHistogramStore(l, 30*time.Minute),
counterStore,
histogramStore,
)
if err != nil {
return nil, fmt.Errorf("failed to create monitoring collector: %w", err)
}
counterStores = append(counterStores, counterStore)
histogramStores = append(histogramStores, histogramStore)
gcpCollectors = append(gcpCollectors, monitoringCollector)
}

run := func(ctx context.Context) error {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
level.Debug(l).Log("msg", "Starting delta store pruning", "number_of_stores", len(counterStores)+len(histogramStores))
for _, store := range counterStores {
store.Prune(ctx)
}
for _, store := range histogramStores {
store.Prune(ctx)
}
level.Debug(l).Log("msg", "Finished delta store pruning", "number_of_stores", len(counterStores)+len(histogramStores))
case <-ctx.Done():
return ctx.Err()
}
}
}

return integrations.NewCollectorIntegration(
c.Name(), integrations.WithCollectors(gcpCollectors...),
c.Name(), integrations.WithCollectors(gcpCollectors...), integrations.WithRunner(run),
), nil
}

Expand Down
103 changes: 103 additions & 0 deletions internal/static/integrations/gcp_exporter/self_pruning_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package gcp_exporter

import (
"context"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus-community/stackdriver_exporter/collectors"
"github.com/tilinna/clock"
"go.uber.org/atomic"
"google.golang.org/api/monitoring/v3"
)

// Five minutes in seconds for Unix comparisons
const FiveMinutePruneWindow int64 = 5 * 60

type CounterOrHistogramStore[T CounterOrHistogram] interface {
Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *T)
ListMetrics(metricDescriptorName string) []*T
}

type CounterOrHistogram interface {
collectors.ConstMetric | collectors.HistogramMetric
}

type SelfPruningDeltaStore[T CounterOrHistogram] struct {
wrapping CounterOrHistogramStore[T]
mux sync.Mutex
trackedMetricDescriptorNames map[string]struct{}
lastListOperationTime atomic.Int64
logger log.Logger
}

// NewSelfPruningDeltaStore provides a configured instance of the SelfPruningDeltaStore which wraps an existing delta
// store implementation with support for pruning the store when it's not being used as a part of normal operations.
//
// The GCP exporter naturally prunes the store over time during normal operations. If the exporter is being used in
// clustering mode and does not have active GCP targets this will not happen. If it later has targets assigned any
// old counter values will be used potentially causing invalid rate and increase calculations.
//
// This is a short term fix until clustering aware components are completed. This will ensure the in-memory counters
// are pruned when an exporter instance no longer has targets assigned.
func NewSelfPruningDeltaStore[T CounterOrHistogram](l log.Logger, wrapping CounterOrHistogramStore[T]) *SelfPruningDeltaStore[T] {
return &SelfPruningDeltaStore[T]{
logger: l,
wrapping: wrapping,
trackedMetricDescriptorNames: map[string]struct{}{},
}
}

// Increment delegates to the wrapped store
// We do not track metric descriptors from here to avoid more locking in a high throughput function
func (s *SelfPruningDeltaStore[T]) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *T) {
s.wrapping.Increment(metricDescriptor, currentValue)
}

// ListMetrics delegates to the wrapped store and updates tracking for the metricDescriptorName based on the results
func (s *SelfPruningDeltaStore[T]) ListMetrics(metricDescriptorName string) []*T {
s.lastListOperationTime.Store(time.Now().Unix())
result := s.wrapping.ListMetrics(metricDescriptorName)

s.mux.Lock()
defer s.mux.Unlock()
// We only care to add to tracking when the descriptor has results and remove it when it no longer has results
_, ok := s.trackedMetricDescriptorNames[metricDescriptorName]
if !ok && len(result) > 0 {
s.trackedMetricDescriptorNames[metricDescriptorName] = struct{}{}
} else if ok && len(result) == 0 {
delete(s.trackedMetricDescriptorNames, metricDescriptorName)
}

return result
}

func (s *SelfPruningDeltaStore[T]) Prune(ctx context.Context) {
now := clock.Now(ctx).Unix()
if s.shouldPrune(now) {
level.Debug(s.logger).Log("msg", "Pruning window breached starting prune")
s.mux.Lock()
defer s.mux.Unlock()
for descriptor := range s.trackedMetricDescriptorNames {
// Early eject if ListMetrics is being called again
if !s.shouldPrune(now) {
level.Debug(s.logger).Log("msg", "Store no longer needs pruned stopping")
break
}
// Calling ListMetrics has a side effect of pruning any data outside a configured TTL we want to make sure
// this will always continue to happen
result := s.wrapping.ListMetrics(descriptor)
if len(result) == 0 {
delete(s.trackedMetricDescriptorNames, descriptor)
}
level.Debug(s.logger).Log("msg", "Pruning metric descriptor", "metric_descriptor", descriptor, "metrics_remaining", len(result))
}
level.Debug(s.logger).Log("msg", "Pruning finished")
}
}

func (s *SelfPruningDeltaStore[T]) shouldPrune(now int64) bool {
return (now - s.lastListOperationTime.Load()) > FiveMinutePruneWindow
}
154 changes: 154 additions & 0 deletions internal/static/integrations/gcp_exporter/self_pruning_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package gcp_exporter_test

import (
"context"
"testing"
"time"

"github.com/prometheus-community/stackdriver_exporter/collectors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tilinna/clock"
"golang.org/x/exp/maps"
"google.golang.org/api/monitoring/v3"

"github.com/grafana/alloy/internal/static/integrations/gcp_exporter"
"github.com/grafana/alloy/internal/util"
)

type testStore struct {
incrementCounter int
descriptorToListMetricsCounter map[string]int
state map[string][]*collectors.ConstMetric
}

func newTestStore(state ...map[string][]*collectors.ConstMetric) *testStore {
internalState := map[string][]*collectors.ConstMetric{}
if len(state) == 1 {
internalState = state[0]
}
return &testStore{
descriptorToListMetricsCounter: map[string]int{},
state: internalState,
}
}

func (t *testStore) Increment(_ *monitoring.MetricDescriptor, _ *collectors.ConstMetric) {
t.incrementCounter++
}

func (t *testStore) ListMetrics(metricDescriptorName string) []*collectors.ConstMetric {
t.descriptorToListMetricsCounter[metricDescriptorName]++
return t.state[metricDescriptorName]
}

func TestSelfPruningDeltaStore_Increment_Delegates(t *testing.T) {
counterStore := newTestStore()
pruningStore := gcp_exporter.NewSelfPruningDeltaStore[collectors.ConstMetric](util.TestAlloyLogger(t), counterStore)
descriptor := &monitoring.MetricDescriptor{Name: "test-descriptor"}
currentValue := &collectors.ConstMetric{}
pruningStore.Increment(descriptor, currentValue)
assert.Equal(t, 1, counterStore.incrementCounter)
}

func TestSelfPruningDeltaStore_ListMetrics_Delegates(t *testing.T) {
counterStore := newTestStore()
pruningStore := gcp_exporter.NewSelfPruningDeltaStore[collectors.ConstMetric](util.TestAlloyLogger(t), counterStore)
pruningStore.ListMetrics("test-descriptor")
assert.Len(t, counterStore.descriptorToListMetricsCounter, 1)
assert.Equal(t, 1, counterStore.descriptorToListMetricsCounter["test-descriptor"])
}

func TestSelfPruningDeltaStore_PruningWorkflow(t *testing.T) {
sixMinutesAheadClock := clock.Context(context.Background(), clock.NewMock(time.Now().Add(6*time.Minute)))
type testCase struct {
name string
storeState map[string][]*collectors.ConstMetric
callsToMakeTo func(store *gcp_exporter.SelfPruningDeltaStore[collectors.ConstMetric], ts *testStore)
expectedCallCounts map[string]int
}
tests := []testCase{
{
name: "does nothing when last operation time does not require pruning",
storeState: map[string][]*collectors.ConstMetric{
"test-descriptor": {{FqName: "test-const-metric"}},
},
callsToMakeTo: func(store *gcp_exporter.SelfPruningDeltaStore[collectors.ConstMetric], ts *testStore) {
// Initialize last operation time
store.ListMetrics("test-descriptor")
store.Prune(context.Background())
},
expectedCallCounts: map[string]int{"test-descriptor": 1}, // Once to init last operation time
},
{
name: "does nothing when no metric descriptors have been tracked",
storeState: map[string][]*collectors.ConstMetric{},
callsToMakeTo: func(store *gcp_exporter.SelfPruningDeltaStore[collectors.ConstMetric], ts *testStore) {
// Initialize last operation time
store.ListMetrics("test-descriptor")
store.Prune(sixMinutesAheadClock)
},
expectedCallCounts: map[string]int{"test-descriptor": 1}, // Once to init last operation time
},
{
name: "will prune outstanding descriptors",
storeState: map[string][]*collectors.ConstMetric{
"test-descriptor": {{FqName: "test-const-metric"}},
},
callsToMakeTo: func(store *gcp_exporter.SelfPruningDeltaStore[collectors.ConstMetric], ts *testStore) {
store.ListMetrics("test-descriptor")
store.Prune(sixMinutesAheadClock)
},
expectedCallCounts: map[string]int{
"test-descriptor": 2, // Once to track it and once to prune it
},
},
{
name: "will stop pruning a descriptor with no results",
storeState: map[string][]*collectors.ConstMetric{
"test-descriptor": {{FqName: "test-const-metric"}},
},
callsToMakeTo: func(store *gcp_exporter.SelfPruningDeltaStore[collectors.ConstMetric], ts *testStore) {
store.ListMetrics("test-descriptor")
ts.state["test-descriptor"] = []*collectors.ConstMetric{}
store.Prune(sixMinutesAheadClock)
store.Prune(sixMinutesAheadClock)
},
expectedCallCounts: map[string]int{
"test-descriptor": 2, // Once to track it and once to prune it
},
},
{
name: "stops tracking descriptors with no results",
storeState: map[string][]*collectors.ConstMetric{
"test-descriptor": {{FqName: "test-const-metric"}},
},
callsToMakeTo: func(store *gcp_exporter.SelfPruningDeltaStore[collectors.ConstMetric], ts *testStore) {
// Track it
store.ListMetrics("test-descriptor")
// Make it empty
ts.state["test-descriptor"] = []*collectors.ConstMetric{}
// Try to untrack it
store.ListMetrics("test-descriptor")
store.Prune(sixMinutesAheadClock)
},
expectedCallCounts: map[string]int{
"test-descriptor": 2, // Once to track it and once to untrack it
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ts := newTestStore(tc.storeState)
store := gcp_exporter.NewSelfPruningDeltaStore[collectors.ConstMetric](
util.TestAlloyLogger(t),
ts)
tc.callsToMakeTo(store, ts)

require.ElementsMatch(t, maps.Keys(tc.expectedCallCounts), maps.Keys(ts.descriptorToListMetricsCounter))
for descriptor, callCount := range tc.expectedCallCounts {
assert.Equal(t, callCount, ts.descriptorToListMetricsCounter[descriptor], "descriptor %s had an incorrect call count", descriptor)
}
})
}
}

0 comments on commit 42cb22e

Please sign in to comment.