diff --git a/CHANGELOG.md b/CHANGELOG.md index 2090bc33b4..c2b3111066 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,6 +63,8 @@ Main (unreleased) - Added support for NS records to `discovery.dns`. (@djcode) +- Improved clustering use cases for tracking GCP delta metrics in the `prometheus.exporter.gcp` (@kgeckhart) + ### Bugfixes - Fixed an issue with `prometheus.scrape` in which targets that move from one diff --git a/internal/static/integrations/gcp_exporter/gcp_exporter.go b/internal/static/integrations/gcp_exporter/gcp_exporter.go index fed0251416..a574585696 100644 --- a/internal/static/integrations/gcp_exporter/gcp_exporter.go +++ b/internal/static/integrations/gcp_exporter/gcp_exporter.go @@ -22,6 +22,7 @@ import ( "google.golang.org/api/option" "gopkg.in/yaml.v2" + "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/static/integrations" integrations_v2 "github.com/grafana/alloy/internal/static/integrations/v2" "github.com/grafana/alloy/internal/static/integrations/v2/metricsutils" @@ -85,7 +86,11 @@ func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error) } var gcpCollectors []prometheus.Collector + var counterStores []*SelfPruningDeltaStore[collectors.ConstMetric] + var histogramStores []*SelfPruningDeltaStore[collectors.HistogramMetric] for _, projectID := range c.ProjectIDs { + counterStore := NewSelfPruningDeltaStore[collectors.ConstMetric](l, delta.NewInMemoryCounterStore(l, 30*time.Minute)) + histogramStore := NewSelfPruningDeltaStore[collectors.HistogramMetric](l, delta.NewInMemoryHistogramStore(l, 30*time.Minute)) monitoringCollector, err := collectors.NewMonitoringCollector( projectID, svc, @@ -106,17 +111,39 @@ func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error) AggregateDeltas: true, }, l, - delta.NewInMemoryCounterStore(l, 30*time.Minute), - delta.NewInMemoryHistogramStore(l, 30*time.Minute), + counterStore, + histogramStore, ) if err != nil { return nil, fmt.Errorf("failed to create monitoring collector: %w", err) } + counterStores = append(counterStores, counterStore) + histogramStores = append(histogramStores, histogramStore) gcpCollectors = append(gcpCollectors, monitoringCollector) } + run := func(ctx context.Context) error { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + level.Debug(l).Log("msg", "Starting delta store pruning", "number_of_stores", len(counterStores)+len(histogramStores)) + for _, store := range counterStores { + store.Prune(ctx) + } + for _, store := range histogramStores { + store.Prune(ctx) + } + level.Debug(l).Log("msg", "Finished delta store pruning", "number_of_stores", len(counterStores)+len(histogramStores)) + case <-ctx.Done(): + return ctx.Err() + } + } + } + return integrations.NewCollectorIntegration( - c.Name(), integrations.WithCollectors(gcpCollectors...), + c.Name(), integrations.WithCollectors(gcpCollectors...), integrations.WithRunner(run), ), nil } diff --git a/internal/static/integrations/gcp_exporter/self_pruning_store.go b/internal/static/integrations/gcp_exporter/self_pruning_store.go new file mode 100644 index 0000000000..7605079b62 --- /dev/null +++ b/internal/static/integrations/gcp_exporter/self_pruning_store.go @@ -0,0 +1,103 @@ +package gcp_exporter + +import ( + "context" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/tilinna/clock" + "go.uber.org/atomic" + "google.golang.org/api/monitoring/v3" +) + +// Five minutes in seconds for Unix comparisons +const FiveMinutePruneWindow int64 = 5 * 60 + +type CounterOrHistogramStore[T CounterOrHistogram] interface { + Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *T) + ListMetrics(metricDescriptorName string) []*T +} + +type CounterOrHistogram interface { + collectors.ConstMetric | collectors.HistogramMetric +} + +type SelfPruningDeltaStore[T CounterOrHistogram] struct { + wrapping CounterOrHistogramStore[T] + mux sync.Mutex + trackedMetricDescriptorNames map[string]struct{} + lastListOperationTime atomic.Int64 + logger log.Logger +} + +// NewSelfPruningDeltaStore provides a configured instance of the SelfPruningDeltaStore which wraps an existing delta +// store implementation with support for pruning the store when it's not being used as a part of normal operations. +// +// The GCP exporter naturally prunes the store over time during normal operations. If the exporter is being used in +// clustering mode and does not have active GCP targets this will not happen. If it later has targets assigned any +// old counter values will be used potentially causing invalid rate and increase calculations. +// +// This is a short term fix until clustering aware components are completed. This will ensure the in-memory counters +// are pruned when an exporter instance no longer has targets assigned. +func NewSelfPruningDeltaStore[T CounterOrHistogram](l log.Logger, wrapping CounterOrHistogramStore[T]) *SelfPruningDeltaStore[T] { + return &SelfPruningDeltaStore[T]{ + logger: l, + wrapping: wrapping, + trackedMetricDescriptorNames: map[string]struct{}{}, + } +} + +// Increment delegates to the wrapped store +// We do not track metric descriptors from here to avoid more locking in a high throughput function +func (s *SelfPruningDeltaStore[T]) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *T) { + s.wrapping.Increment(metricDescriptor, currentValue) +} + +// ListMetrics delegates to the wrapped store and updates tracking for the metricDescriptorName based on the results +func (s *SelfPruningDeltaStore[T]) ListMetrics(metricDescriptorName string) []*T { + s.lastListOperationTime.Store(time.Now().Unix()) + result := s.wrapping.ListMetrics(metricDescriptorName) + + s.mux.Lock() + defer s.mux.Unlock() + // We only care to add to tracking when the descriptor has results and remove it when it no longer has results + _, ok := s.trackedMetricDescriptorNames[metricDescriptorName] + if !ok && len(result) > 0 { + s.trackedMetricDescriptorNames[metricDescriptorName] = struct{}{} + } else if ok && len(result) == 0 { + delete(s.trackedMetricDescriptorNames, metricDescriptorName) + } + + return result +} + +func (s *SelfPruningDeltaStore[T]) Prune(ctx context.Context) { + now := clock.Now(ctx).Unix() + if s.shouldPrune(now) { + level.Debug(s.logger).Log("msg", "Pruning window breached starting prune") + s.mux.Lock() + defer s.mux.Unlock() + for descriptor := range s.trackedMetricDescriptorNames { + // Early eject if ListMetrics is being called again + if !s.shouldPrune(now) { + level.Debug(s.logger).Log("msg", "Store no longer needs pruned stopping") + break + } + // Calling ListMetrics has a side effect of pruning any data outside a configured TTL we want to make sure + // this will always continue to happen + result := s.wrapping.ListMetrics(descriptor) + if len(result) == 0 { + delete(s.trackedMetricDescriptorNames, descriptor) + } + level.Debug(s.logger).Log("msg", "Pruning metric descriptor", "metric_descriptor", descriptor, "metrics_remaining", len(result)) + } + level.Debug(s.logger).Log("msg", "Pruning finished") + } +} + +func (s *SelfPruningDeltaStore[T]) shouldPrune(now int64) bool { + return (now - s.lastListOperationTime.Load()) > FiveMinutePruneWindow +} diff --git a/internal/static/integrations/gcp_exporter/self_pruning_store_test.go b/internal/static/integrations/gcp_exporter/self_pruning_store_test.go new file mode 100644 index 0000000000..dcece41ba1 --- /dev/null +++ b/internal/static/integrations/gcp_exporter/self_pruning_store_test.go @@ -0,0 +1,154 @@ +package gcp_exporter_test + +import ( + "context" + "testing" + "time" + + "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tilinna/clock" + "golang.org/x/exp/maps" + "google.golang.org/api/monitoring/v3" + + "github.com/grafana/alloy/internal/static/integrations/gcp_exporter" + "github.com/grafana/alloy/internal/util" +) + +type testStore struct { + incrementCounter int + descriptorToListMetricsCounter map[string]int + state map[string][]*collectors.ConstMetric +} + +func newTestStore(state ...map[string][]*collectors.ConstMetric) *testStore { + internalState := map[string][]*collectors.ConstMetric{} + if len(state) == 1 { + internalState = state[0] + } + return &testStore{ + descriptorToListMetricsCounter: map[string]int{}, + state: internalState, + } +} + +func (t *testStore) Increment(_ *monitoring.MetricDescriptor, _ *collectors.ConstMetric) { + t.incrementCounter++ +} + +func (t *testStore) ListMetrics(metricDescriptorName string) []*collectors.ConstMetric { + t.descriptorToListMetricsCounter[metricDescriptorName]++ + return t.state[metricDescriptorName] +} + +func TestSelfPruningDeltaStore_Increment_Delegates(t *testing.T) { + counterStore := newTestStore() + pruningStore := gcp_exporter.NewSelfPruningDeltaStore[collectors.ConstMetric](util.TestAlloyLogger(t), counterStore) + descriptor := &monitoring.MetricDescriptor{Name: "test-descriptor"} + currentValue := &collectors.ConstMetric{} + pruningStore.Increment(descriptor, currentValue) + assert.Equal(t, 1, counterStore.incrementCounter) +} + +func TestSelfPruningDeltaStore_ListMetrics_Delegates(t *testing.T) { + counterStore := newTestStore() + pruningStore := gcp_exporter.NewSelfPruningDeltaStore[collectors.ConstMetric](util.TestAlloyLogger(t), counterStore) + pruningStore.ListMetrics("test-descriptor") + assert.Len(t, counterStore.descriptorToListMetricsCounter, 1) + assert.Equal(t, 1, counterStore.descriptorToListMetricsCounter["test-descriptor"]) +} + +func TestSelfPruningDeltaStore_PruningWorkflow(t *testing.T) { + sixMinutesAheadClock := clock.Context(context.Background(), clock.NewMock(time.Now().Add(6*time.Minute))) + type testCase struct { + name string + storeState map[string][]*collectors.ConstMetric + callsToMakeTo func(store *gcp_exporter.SelfPruningDeltaStore[collectors.ConstMetric], ts *testStore) + expectedCallCounts map[string]int + } + tests := []testCase{ + { + name: "does nothing when last operation time does not require pruning", + storeState: map[string][]*collectors.ConstMetric{ + "test-descriptor": {{FqName: "test-const-metric"}}, + }, + callsToMakeTo: func(store *gcp_exporter.SelfPruningDeltaStore[collectors.ConstMetric], ts *testStore) { + // Initialize last operation time + store.ListMetrics("test-descriptor") + store.Prune(context.Background()) + }, + expectedCallCounts: map[string]int{"test-descriptor": 1}, // Once to init last operation time + }, + { + name: "does nothing when no metric descriptors have been tracked", + storeState: map[string][]*collectors.ConstMetric{}, + callsToMakeTo: func(store *gcp_exporter.SelfPruningDeltaStore[collectors.ConstMetric], ts *testStore) { + // Initialize last operation time + store.ListMetrics("test-descriptor") + store.Prune(sixMinutesAheadClock) + }, + expectedCallCounts: map[string]int{"test-descriptor": 1}, // Once to init last operation time + }, + { + name: "will prune outstanding descriptors", + storeState: map[string][]*collectors.ConstMetric{ + "test-descriptor": {{FqName: "test-const-metric"}}, + }, + callsToMakeTo: func(store *gcp_exporter.SelfPruningDeltaStore[collectors.ConstMetric], ts *testStore) { + store.ListMetrics("test-descriptor") + store.Prune(sixMinutesAheadClock) + }, + expectedCallCounts: map[string]int{ + "test-descriptor": 2, // Once to track it and once to prune it + }, + }, + { + name: "will stop pruning a descriptor with no results", + storeState: map[string][]*collectors.ConstMetric{ + "test-descriptor": {{FqName: "test-const-metric"}}, + }, + callsToMakeTo: func(store *gcp_exporter.SelfPruningDeltaStore[collectors.ConstMetric], ts *testStore) { + store.ListMetrics("test-descriptor") + ts.state["test-descriptor"] = []*collectors.ConstMetric{} + store.Prune(sixMinutesAheadClock) + store.Prune(sixMinutesAheadClock) + }, + expectedCallCounts: map[string]int{ + "test-descriptor": 2, // Once to track it and once to prune it + }, + }, + { + name: "stops tracking descriptors with no results", + storeState: map[string][]*collectors.ConstMetric{ + "test-descriptor": {{FqName: "test-const-metric"}}, + }, + callsToMakeTo: func(store *gcp_exporter.SelfPruningDeltaStore[collectors.ConstMetric], ts *testStore) { + // Track it + store.ListMetrics("test-descriptor") + // Make it empty + ts.state["test-descriptor"] = []*collectors.ConstMetric{} + // Try to untrack it + store.ListMetrics("test-descriptor") + store.Prune(sixMinutesAheadClock) + }, + expectedCallCounts: map[string]int{ + "test-descriptor": 2, // Once to track it and once to untrack it + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ts := newTestStore(tc.storeState) + store := gcp_exporter.NewSelfPruningDeltaStore[collectors.ConstMetric]( + util.TestAlloyLogger(t), + ts) + tc.callsToMakeTo(store, ts) + + require.ElementsMatch(t, maps.Keys(tc.expectedCallCounts), maps.Keys(ts.descriptorToListMetricsCounter)) + for descriptor, callCount := range tc.expectedCallCounts { + assert.Equal(t, callCount, ts.descriptorToListMetricsCounter[descriptor], "descriptor %s had an incorrect call count", descriptor) + } + }) + } +}