diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e542e54e9..b9ec013d9b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * [FEATURE] Ruler: Support filtering results from rule status endpoint by `file`, `rule_group` and `rule_name`. #5291 * [FEATURE] Ingester: add experimental support for creating tokens by using spread minimizing strategy. This can be enabled with `-ingester.ring.token-generation-strategy: spread-minimizing` and `-ingester.ring.spread-minimizing-zones: `. In that case `-ingester.ring.tokens-file-path` must be empty. #5308 #5324 * [FEATURE] Ingester: add experimental support to compact the TSDB Head when the number of in-memory series is equal or greater than `-blocks-storage.tsdb.early-head-compaction-min-in-memory-series`, and the ingester estimates that the per-tenant TSDB Head compaction will reduce in-memory series by at least `-blocks-storage.tsdb.early-head-compaction-min-estimated-series-reduction-percentage`. #5371 +* [FEATURE] Ingester: add new metrics for tracking native histograms in active series: `cortex_ingester_active_native_histogram_series`, `cortex_ingester_active_native_histogram_series_custom_tracker`, `cortex_ingester_active_native_histogram_buckets`, `cortex_ingester_active_native_histogram_buckets_custom_tracker`. The first 2 are the subsets of the existing and unmodified `cortex_ingester_active_series` and `cortex_ingester_active_series_custom_tracker` respectively, only tracking native histogram series, and the last 2 are the equivalents for tracking the number of buckets in native histogram series. #5318 * [ENHANCEMENT] Overrides-exporter: Add new metrics for write path and alertmanager (`max_global_metadata_per_user`, `max_global_metadata_per_metric`, `request_rate`, `request_burst_size`, `alertmanager_notification_rate_limit`, `alertmanager_max_dispatcher_aggregation_groups`, `alertmanager_max_alerts_count`, `alertmanager_max_alerts_size_bytes`) and added flag `-overrides-exporter.enabled-metrics` to explicitly configure desired metrics, e.g. `-overrides-exporter.enabled-metrics=request_rate,ingestion_rate`. Default value for this flag is: `ingestion_rate,ingestion_burst_size,max_global_series_per_user,max_global_series_per_metric,max_global_exemplars_per_user,max_fetched_chunks_per_query,max_fetched_series_per_query,ruler_max_rules_per_rule_group,ruler_max_rule_groups_per_tenant`. #5376 * [ENHANCEMENT] Cardinality API: When zone aware replication is enabled, the label values cardinality API can now tolerate single zone failure #5178 * [ENHANCEMENT] Distributor: optimize sending requests to ingesters when incoming requests don't need to be modified. #5137 #5389 diff --git a/pkg/ingester/activeseries/active_postings_test.go b/pkg/ingester/activeseries/active_postings_test.go index 350822f321..e13bcbc913 100644 --- a/pkg/ingester/activeseries/active_postings_test.go +++ b/pkg/ingester/activeseries/active_postings_test.go @@ -28,11 +28,11 @@ func TestPostings_Expand(t *testing.T) { // Update each series at a different time according to its index. for i := range allStorageRefs { - activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0)) + activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0), -1) } valid := activeSeries.Purge(mockedTime) - allActive, _ := activeSeries.ActiveWithMatchers() + allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() require.True(t, valid) require.Equal(t, 2, allActive) @@ -60,11 +60,11 @@ func TestPostings_Seek(t *testing.T) { // Update each series at a different time according to its index. for i := range allStorageRefs { - activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0)) + activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0), -1) } valid := activeSeries.Purge(mockedTime) - allActive, _ := activeSeries.ActiveWithMatchers() + allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() require.True(t, valid) require.Equal(t, 2, allActive) @@ -92,11 +92,11 @@ func TestPostings_SeekToEnd(t *testing.T) { // Update each series at a different time according to its index. for i := range allStorageRefs { - activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0)) + activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0), -1) } valid := activeSeries.Purge(mockedTime) - allActive, _ := activeSeries.ActiveWithMatchers() + allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() require.True(t, valid) require.Equal(t, 0, allActive) diff --git a/pkg/ingester/activeseries/active_series.go b/pkg/ingester/activeseries/active_series.go index e9999151f4..c5b68ed120 100644 --- a/pkg/ingester/activeseries/active_series.go +++ b/pkg/ingester/activeseries/active_series.go @@ -57,16 +57,21 @@ type seriesStripe struct { // without holding the lock -- hence the atomic). oldestEntryTs atomic.Int64 - mu sync.RWMutex - refs map[uint64]seriesEntry - active int // Number of active entries in this stripe. Only decreased during purge or clear. - activeMatching []int // Number of active entries in this stripe matching each matcher of the configured Matchers. + mu sync.RWMutex + refs map[uint64]seriesEntry + active int // Number of active entries in this stripe. Only decreased during purge or clear. + activeMatching []int // Number of active entries in this stripe matching each matcher of the configured Matchers. + activeNativeHistograms int // Number of active entries (only native histograms) in this stripe. Only decreased during purge or clear. + activeMatchingNativeHistograms []int // Number of active entries (only native histograms) in this stripe matching each matcher of the configured Matchers. + activeNativeHistogramBuckets int // Number of buckets in active native histogram entries in this stripe. Only decreased during purge or clear. + activeMatchingNativeHistogramBuckets []int // Number of buckets in active native histogram entries in this stripe matching each matcher of the configured Matchers. } // seriesEntry holds a timestamp for single series. type seriesEntry struct { - nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe. - matches preAllocDynamicSlice // Index of the matcher matching + nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe. + matches preAllocDynamicSlice // Index of the matcher matching + numNativeHistogramBuckets int // Number of buckets in native histogram series, -1 if not a native histogram. } func NewActiveSeries(asm *Matchers, timeout time.Duration) *ActiveSeries { @@ -104,10 +109,11 @@ func (c *ActiveSeries) CurrentConfig() CustomTrackersConfig { } // UpdateSeries updates series timestamp to 'now'. Function is called to make a copy of labels if entry doesn't exist yet. -func (c *ActiveSeries) UpdateSeries(series labels.Labels, ref uint64, now time.Time) { +// Pass -1 in numNativeHistogramBuckets if the series is not a native histogram series. +func (c *ActiveSeries) UpdateSeries(series labels.Labels, ref uint64, now time.Time, numNativeHistogramBuckets int) { stripeID := ref % numStripes - c.stripes[stripeID].updateSeriesTimestamp(now, series, ref) + c.stripes[stripeID].updateSeriesTimestamp(now, series, ref, numNativeHistogramBuckets) } // Purge purges expired entries and returns true if enough time has passed since @@ -134,31 +140,41 @@ func (c *ActiveSeries) ContainsRef(ref uint64) bool { return c.stripes[stripeID].containsRef(ref) } -// Active returns the total number of active series. This method does not purge -// expired entries, so Purge should be called periodically. -func (c *ActiveSeries) Active() int { - total := 0 +// Active returns the total numbers of active series, active native +// histogram series, and buckets of those native histogram series. +// This method does not purge expired entries, so Purge should be +// called periodically. +func (c *ActiveSeries) Active() (total, totalNativeHistograms, totalNativeHistogramBuckets int) { for s := 0; s < numStripes; s++ { - total += c.stripes[s].getTotal() + all, histograms, buckets := c.stripes[s].getTotal() + total += all + totalNativeHistograms += histograms + totalNativeHistogramBuckets += buckets } - return total + return } // ActiveWithMatchers returns the total number of active series, as well as a // slice of active series matching each one of the custom trackers provided (in -// the same order as custom trackers are defined). This method does not purge +// the same order as custom trackers are defined), and then the same thing for +// only active series that are native histograms, then the same for the number +// of buckets in those active native histogram series. This method does not purge // expired entries, so Purge should be called periodically. -func (c *ActiveSeries) ActiveWithMatchers() (int, []int) { +func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, totalNativeHistograms int, totalMatchingNativeHistograms []int, totalNativeHistogramBuckets int, totalMatchingNativeHistogramBuckets []int) { c.matchersMutex.RLock() defer c.matchersMutex.RUnlock() - total := 0 - totalMatching := resizeAndClear(len(c.matchers.MatcherNames()), nil) + totalMatching = resizeAndClear(len(c.matchers.MatcherNames()), nil) + totalMatchingNativeHistograms = resizeAndClear(len(c.matchers.MatcherNames()), nil) + totalMatchingNativeHistogramBuckets = resizeAndClear(len(c.matchers.MatcherNames()), nil) for s := 0; s < numStripes; s++ { - total += c.stripes[s].getTotalAndUpdateMatching(totalMatching) + all, histograms, buckets := c.stripes[s].getTotalAndUpdateMatching(totalMatching, totalMatchingNativeHistograms, totalMatchingNativeHistogramBuckets) + total += all + totalNativeHistograms += histograms + totalNativeHistogramBuckets += buckets } - return total, totalMatching + return } func (s *seriesStripe) containsRef(ref uint64) bool { @@ -170,15 +186,18 @@ func (s *seriesStripe) containsRef(ref uint64) bool { } // getTotal will return the total active series in the stripe -func (s *seriesStripe) getTotal() int { +// and the total active series that are native histograms +// and the total number of buckets in active native histogram series +func (s *seriesStripe) getTotal() (int, int, int) { s.mu.RLock() defer s.mu.RUnlock() - return s.active + return s.active, s.activeNativeHistograms, s.activeNativeHistogramBuckets } // getTotalAndUpdateMatching will return the total active series in the stripe and also update the slice provided -// with each matcher's total. -func (s *seriesStripe) getTotalAndUpdateMatching(matching []int) int { +// with each matcher's total, and will also do the same for total active series that are native histograms +// as well as the total number of buckets in active native histogram series +func (s *seriesStripe) getTotalAndUpdateMatching(matching []int, matchingNativeHistograms []int, matchingNativeHistogramBuckets []int) (int, int, int) { s.mu.RLock() defer s.mu.RUnlock() @@ -186,17 +205,23 @@ func (s *seriesStripe) getTotalAndUpdateMatching(matching []int) int { for i, a := range s.activeMatching { matching[i] += a } + for i, a := range s.activeMatchingNativeHistograms { + matchingNativeHistograms[i] += a + } + for i, a := range s.activeMatchingNativeHistogramBuckets { + matchingNativeHistogramBuckets[i] += a + } - return s.active + return s.active, s.activeNativeHistograms, s.activeNativeHistogramBuckets } -func (s *seriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels, ref uint64) { +func (s *seriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels, ref uint64, numNativeHistogramBuckets int) { nowNanos := now.UnixNano() - e := s.findEntryForSeries(ref) + e, needsUpdating := s.findEntryForSeries(ref, numNativeHistogramBuckets) entryTimeSet := false - if e == nil { - e, entryTimeSet = s.findOrCreateEntryForSeries(ref, series, nowNanos) + if e == nil || needsUpdating { + e, entryTimeSet = s.findAndUpdateOrCreateEntryForSeries(ref, series, nowNanos, numNativeHistogramBuckets) } if !entryTimeSet { @@ -216,13 +241,14 @@ func (s *seriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels } } -func (s *seriesStripe) findEntryForSeries(ref uint64) *atomic.Int64 { +func (s *seriesStripe) findEntryForSeries(ref uint64, numNativeHistogramBuckets int) (*atomic.Int64, bool) { s.mu.RLock() defer s.mu.RUnlock() - return s.refs[ref].nanos + entry := s.refs[ref] + return entry.nanos, entry.numNativeHistogramBuckets != numNativeHistogramBuckets } -func (s *seriesStripe) findOrCreateEntryForSeries(ref uint64, series labels.Labels, nowNanos int64) (*atomic.Int64, bool) { +func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref uint64, series labels.Labels, nowNanos int64, numNativeHistogramBuckets int) (*atomic.Int64, bool) { s.mu.Lock() defer s.mu.Unlock() @@ -230,6 +256,38 @@ func (s *seriesStripe) findOrCreateEntryForSeries(ref uint64, series labels.Labe // This repeats findEntryForSeries(), but under write lock. entry, ok := s.refs[ref] if ok { + if entry.numNativeHistogramBuckets != numNativeHistogramBuckets { + matches := s.matchers.matches(series) + matchesLen := matches.len() + if numNativeHistogramBuckets >= 0 && entry.numNativeHistogramBuckets >= 0 { + // change number of buckets but still a histogram + diff := numNativeHistogramBuckets - entry.numNativeHistogramBuckets + s.activeNativeHistogramBuckets += diff + for i := 0; i < matchesLen; i++ { + s.activeMatchingNativeHistogramBuckets[matches.get(i)] += diff + } + } else if numNativeHistogramBuckets >= 0 { + // change from float to histogram + s.activeNativeHistograms++ + s.activeNativeHistogramBuckets += numNativeHistogramBuckets + for i := 0; i < matchesLen; i++ { + match := matches.get(i) + s.activeMatchingNativeHistograms[match]++ + s.activeMatchingNativeHistogramBuckets[match] += numNativeHistogramBuckets + } + } else { + // change from histogram to float + s.activeNativeHistograms-- + s.activeNativeHistogramBuckets -= entry.numNativeHistogramBuckets + for i := 0; i < matchesLen; i++ { + match := matches.get(i) + s.activeMatchingNativeHistograms[match]-- + s.activeMatchingNativeHistogramBuckets[match] -= entry.numNativeHistogramBuckets + } + } + entry.numNativeHistogramBuckets = numNativeHistogramBuckets + s.refs[ref] = entry + } return entry.nanos, false } @@ -237,13 +295,23 @@ func (s *seriesStripe) findOrCreateEntryForSeries(ref uint64, series labels.Labe matchesLen := matches.len() s.active++ + if numNativeHistogramBuckets >= 0 { + s.activeNativeHistograms++ + s.activeNativeHistogramBuckets += numNativeHistogramBuckets + } for i := 0; i < matchesLen; i++ { - s.activeMatching[matches.get(i)]++ + match := matches.get(i) + s.activeMatching[match]++ + if numNativeHistogramBuckets >= 0 { + s.activeMatchingNativeHistograms[match]++ + s.activeMatchingNativeHistogramBuckets[match] += numNativeHistogramBuckets + } } e := seriesEntry{ - nanos: atomic.NewInt64(nowNanos), - matches: matches, + nanos: atomic.NewInt64(nowNanos), + matches: matches, + numNativeHistogramBuckets: numNativeHistogramBuckets, } s.refs[ref] = e @@ -259,8 +327,12 @@ func (s *seriesStripe) clear() { s.oldestEntryTs.Store(0) s.refs = map[uint64]seriesEntry{} s.active = 0 + s.activeNativeHistograms = 0 + s.activeNativeHistogramBuckets = 0 for i := range s.activeMatching { s.activeMatching[i] = 0 + s.activeMatchingNativeHistograms[i] = 0 + s.activeMatchingNativeHistogramBuckets[i] = 0 } } @@ -272,8 +344,12 @@ func (s *seriesStripe) reinitialize(asm *Matchers) { s.oldestEntryTs.Store(0) s.refs = map[uint64]seriesEntry{} s.active = 0 + s.activeNativeHistograms = 0 + s.activeNativeHistogramBuckets = 0 s.matchers = asm s.activeMatching = resizeAndClear(len(asm.MatcherNames()), s.activeMatching) + s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms) + s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets) } func (s *seriesStripe) purge(keepUntil time.Time) { @@ -287,7 +363,11 @@ func (s *seriesStripe) purge(keepUntil time.Time) { defer s.mu.Unlock() s.active = 0 + s.activeNativeHistograms = 0 + s.activeNativeHistogramBuckets = 0 s.activeMatching = resizeAndClear(len(s.activeMatching), s.activeMatching) + s.activeMatchingNativeHistograms = resizeAndClear(len(s.activeMatchingNativeHistograms), s.activeMatchingNativeHistograms) + s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(s.activeMatchingNativeHistogramBuckets), s.activeMatchingNativeHistogramBuckets) oldest := int64(math.MaxInt64) for ref, entry := range s.refs { @@ -298,9 +378,18 @@ func (s *seriesStripe) purge(keepUntil time.Time) { } s.active++ + if entry.numNativeHistogramBuckets >= 0 { + s.activeNativeHistograms++ + s.activeNativeHistogramBuckets += entry.numNativeHistogramBuckets + } ml := entry.matches.len() for i := 0; i < ml; i++ { - s.activeMatching[entry.matches.get(i)]++ + match := entry.matches.get(i) + s.activeMatching[match]++ + if entry.numNativeHistogramBuckets >= 0 { + s.activeMatchingNativeHistograms[match]++ + s.activeMatchingNativeHistogramBuckets[match] += entry.numNativeHistogramBuckets + } } if ts < oldest { oldest = ts diff --git a/pkg/ingester/activeseries/active_series_test.go b/pkg/ingester/activeseries/active_series_test.go index d0523f902f..5b8225eaa0 100644 --- a/pkg/ingester/activeseries/active_series_test.go +++ b/pkg/ingester/activeseries/active_series_test.go @@ -22,37 +22,105 @@ const DefaultTimeout = 5 * time.Minute func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { ref1, ls1 := uint64(1), labels.FromStrings("a", "1") ref2, ls2 := uint64(2), labels.FromStrings("a", "2") + ref3, ls3 := uint64(3), labels.FromStrings("a", "3") + ref4, ls4 := uint64(4), labels.FromStrings("a", "4") c := NewActiveSeries(&Matchers{}, DefaultTimeout) valid := c.Purge(time.Now()) - allActive, activeMatching := c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := c.ActiveWithMatchers() assert.Equal(t, 0, allActive) assert.Nil(t, activeMatching) - assert.True(t, valid) + assert.Equal(t, 0, allActiveHistograms) + assert.Nil(t, activeMatchingHistograms) + assert.Equal(t, 0, allActiveBuckets) + assert.Nil(t, activeMatchingBuckets) - c.UpdateSeries(ls1, ref1, time.Now()) + c.UpdateSeries(ls1, ref1, time.Now(), -1) valid = c.Purge(time.Now()) - allActive, _ = c.ActiveWithMatchers() - assert.Equal(t, 1, allActive) assert.True(t, valid) - allActive = c.Active() + allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() + assert.Equal(t, 1, allActive) + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, 0, allActiveBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() assert.Equal(t, 1, allActive) + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, 0, allActiveBuckets) - c.UpdateSeries(ls1, ref1, time.Now()) + c.UpdateSeries(ls1, ref1, time.Now(), -1) valid = c.Purge(time.Now()) - allActive, _ = c.ActiveWithMatchers() - assert.Equal(t, 1, allActive) assert.True(t, valid) - allActive = c.Active() + allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() + assert.Equal(t, 1, allActive) + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, 0, allActiveBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() assert.Equal(t, 1, allActive) + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, 0, allActiveBuckets) - c.UpdateSeries(ls2, ref2, time.Now()) + c.UpdateSeries(ls2, ref2, time.Now(), -1) valid = c.Purge(time.Now()) - allActive, _ = c.ActiveWithMatchers() - assert.Equal(t, 2, allActive) assert.True(t, valid) - allActive = c.Active() + allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() assert.Equal(t, 2, allActive) + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, 0, allActiveBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() + assert.Equal(t, 2, allActive) + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, 0, allActiveBuckets) + + c.UpdateSeries(ls3, ref3, time.Now(), 5) + valid = c.Purge(time.Now()) + assert.True(t, valid) + allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() + assert.Equal(t, 3, allActive) + assert.Equal(t, 1, allActiveHistograms) + assert.Equal(t, 5, allActiveBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() + assert.Equal(t, 3, allActive) + assert.Equal(t, 1, allActiveHistograms) + assert.Equal(t, 5, allActiveBuckets) + + c.UpdateSeries(ls4, ref4, time.Now(), 3) + valid = c.Purge(time.Now()) + assert.True(t, valid) + allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() + assert.Equal(t, 4, allActive) + assert.Equal(t, 2, allActiveHistograms) + assert.Equal(t, 8, allActiveBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() + assert.Equal(t, 4, allActive) + assert.Equal(t, 2, allActiveHistograms) + assert.Equal(t, 8, allActiveBuckets) + + // more buckets for a histogram + c.UpdateSeries(ls3, ref3, time.Now(), 7) + valid = c.Purge(time.Now()) + assert.True(t, valid) + allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() + assert.Equal(t, 4, allActive) + assert.Equal(t, 2, allActiveHistograms) + assert.Equal(t, 10, allActiveBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() + assert.Equal(t, 4, allActive) + assert.Equal(t, 2, allActiveHistograms) + assert.Equal(t, 10, allActiveBuckets) + + // changing a metric from histogram to float + c.UpdateSeries(ls4, ref4, time.Now(), -1) + valid = c.Purge(time.Now()) + assert.True(t, valid) + allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() + assert.Equal(t, 4, allActive) + assert.Equal(t, 1, allActiveHistograms) + assert.Equal(t, 7, allActiveBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() + assert.Equal(t, 4, allActive) + assert.Equal(t, 1, allActiveHistograms) + assert.Equal(t, 7, allActiveBuckets) } func TestActiveSeries_ContainsRef(t *testing.T) { @@ -74,7 +142,7 @@ func TestActiveSeries_ContainsRef(t *testing.T) { // Update each series with a different timestamp according to each index for i := 0; i < len(series); i++ { - c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0)) + c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1) } c.purge(time.Unix(int64(ttl), 0)) @@ -83,10 +151,10 @@ func TestActiveSeries_ContainsRef(t *testing.T) { // because the first ttl series should be purged exp := len(series) - (ttl) valid := c.Purge(mockedTime) - allActive, activeMatching := c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() assert.Equal(t, exp, allActive) assert.Nil(t, activeMatching) - assert.True(t, valid) for i := 0; i < len(series); i++ { assert.Equal(t, i >= ttl, c.ContainsRef(refs[i])) @@ -99,53 +167,147 @@ func TestActiveSeries_UpdateSeries_WithMatchers(t *testing.T) { ref1, ls1 := uint64(1), labels.FromStrings("a", "1") ref2, ls2 := uint64(2), labels.FromStrings("a", "2") ref3, ls3 := uint64(3), labels.FromStrings("a", "3") + ref4, ls4 := uint64(4), labels.FromStrings("a", "4") + ref5, ls5 := uint64(5), labels.FromStrings("a", "5") - asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3"}`})) + asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`})) c := NewActiveSeries(asm, DefaultTimeout) valid := c.Purge(time.Now()) - allActive, activeMatching := c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := c.ActiveWithMatchers() assert.Equal(t, 0, allActive) assert.Equal(t, []int{0}, activeMatching) - assert.True(t, valid) - allActive = c.Active() + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, []int{0}, activeMatchingHistograms) + assert.Equal(t, 0, allActiveBuckets) + assert.Equal(t, []int{0}, activeMatchingBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() assert.Equal(t, 0, allActive) + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, 0, allActiveBuckets) - c.UpdateSeries(ls1, ref1, time.Now()) + c.UpdateSeries(ls1, ref1, time.Now(), -1) valid = c.Purge(time.Now()) - allActive, activeMatching = c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) assert.Equal(t, []int{0}, activeMatching) - assert.True(t, valid) - allActive = c.Active() + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, []int{0}, activeMatchingHistograms) + assert.Equal(t, 0, allActiveBuckets) + assert.Equal(t, []int{0}, activeMatchingBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() assert.Equal(t, 1, allActive) + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, 0, allActiveBuckets) - c.UpdateSeries(ls2, ref2, time.Now()) + c.UpdateSeries(ls2, ref2, time.Now(), -1) valid = c.Purge(time.Now()) - allActive, activeMatching = c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() assert.Equal(t, 2, allActive) assert.Equal(t, []int{1}, activeMatching) - assert.True(t, valid) - allActive = c.Active() + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, []int{0}, activeMatchingHistograms) + assert.Equal(t, 0, allActiveBuckets) + assert.Equal(t, []int{0}, activeMatchingBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() assert.Equal(t, 2, allActive) + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, 0, allActiveBuckets) - c.UpdateSeries(ls3, ref3, time.Now()) + c.UpdateSeries(ls3, ref3, time.Now(), -1) valid = c.Purge(time.Now()) - allActive, activeMatching = c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() assert.Equal(t, 3, allActive) assert.Equal(t, []int{2}, activeMatching) - assert.True(t, valid) - allActive = c.Active() + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, []int{0}, activeMatchingHistograms) + assert.Equal(t, 0, allActiveBuckets) + assert.Equal(t, []int{0}, activeMatchingBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() assert.Equal(t, 3, allActive) + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, 0, allActiveBuckets) - c.UpdateSeries(ls3, ref3, time.Now()) + c.UpdateSeries(ls3, ref3, time.Now(), -1) valid = c.Purge(time.Now()) - allActive, activeMatching = c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() assert.Equal(t, 3, allActive) assert.Equal(t, []int{2}, activeMatching) - assert.True(t, valid) - allActive = c.Active() + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, []int{0}, activeMatchingHistograms) + assert.Equal(t, 0, allActiveBuckets) + assert.Equal(t, []int{0}, activeMatchingBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() assert.Equal(t, 3, allActive) + assert.Equal(t, 0, allActiveHistograms) + assert.Equal(t, 0, allActiveBuckets) + + c.UpdateSeries(ls4, ref4, time.Now(), 3) + valid = c.Purge(time.Now()) + assert.True(t, valid) + allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() + assert.Equal(t, 4, allActive) + assert.Equal(t, []int{3}, activeMatching) + assert.Equal(t, 1, allActiveHistograms) + assert.Equal(t, []int{1}, activeMatchingHistograms) + assert.Equal(t, 3, allActiveBuckets) + assert.Equal(t, []int{3}, activeMatchingBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() + assert.Equal(t, 4, allActive) + assert.Equal(t, 1, allActiveHistograms) + assert.Equal(t, 3, allActiveBuckets) + + c.UpdateSeries(ls5, ref5, time.Now(), 5) + valid = c.Purge(time.Now()) + assert.True(t, valid) + allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() + assert.Equal(t, 5, allActive) + assert.Equal(t, []int{3}, activeMatching) + assert.Equal(t, 2, allActiveHistograms) + assert.Equal(t, []int{1}, activeMatchingHistograms) + assert.Equal(t, 8, allActiveBuckets) + assert.Equal(t, []int{3}, activeMatchingBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() + assert.Equal(t, 5, allActive) + assert.Equal(t, 2, allActiveHistograms) + assert.Equal(t, 8, allActiveBuckets) + + // changing a metric from float to histogram + c.UpdateSeries(ls3, ref3, time.Now(), 6) + valid = c.Purge(time.Now()) + assert.True(t, valid) + allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() + assert.Equal(t, 5, allActive) + assert.Equal(t, []int{3}, activeMatching) + assert.Equal(t, 3, allActiveHistograms) + assert.Equal(t, []int{2}, activeMatchingHistograms) + assert.Equal(t, 14, allActiveBuckets) + assert.Equal(t, []int{9}, activeMatchingBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() + assert.Equal(t, 5, allActive) + assert.Equal(t, 3, allActiveHistograms) + assert.Equal(t, 14, allActiveBuckets) + + // fewer (zero) buckets for a histogram + c.UpdateSeries(ls4, ref4, time.Now(), 0) + valid = c.Purge(time.Now()) + assert.True(t, valid) + allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() + assert.Equal(t, 5, allActive) + assert.Equal(t, []int{3}, activeMatching) + assert.Equal(t, 3, allActiveHistograms) + assert.Equal(t, []int{2}, activeMatchingHistograms) + assert.Equal(t, 11, allActiveBuckets) + assert.Equal(t, []int{6}, activeMatchingBuckets) + allActive, allActiveHistograms, allActiveBuckets = c.Active() + assert.Equal(t, 5, allActive) + assert.Equal(t, 3, allActiveHistograms) + assert.Equal(t, 11, allActiveBuckets) } func labelsWithHashCollision() (labels.Labels, labels.Labels) { @@ -171,13 +333,13 @@ func TestActiveSeries_ShouldCorrectlyHandleHashCollisions(t *testing.T) { ref1, ref2 := uint64(1), uint64(2) c := NewActiveSeries(&Matchers{}, DefaultTimeout) - c.UpdateSeries(ls1, ref1, time.Now()) - c.UpdateSeries(ls2, ref2, time.Now()) + c.UpdateSeries(ls1, ref1, time.Now(), -1) + c.UpdateSeries(ls2, ref2, time.Now(), -1) valid := c.Purge(time.Now()) - allActive, _ := c.ActiveWithMatchers() - assert.Equal(t, 2, allActive) assert.True(t, valid) + allActive, _, _, _, _, _ := c.ActiveWithMatchers() + assert.Equal(t, 2, allActive) } func TestActiveSeries_Purge_NoMatchers(t *testing.T) { @@ -198,7 +360,7 @@ func TestActiveSeries_Purge_NoMatchers(t *testing.T) { c := NewActiveSeries(&Matchers{}, DefaultTimeout) for i := 0; i < len(series); i++ { - c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0)) + c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1) } c.purge(time.Unix(int64(ttl), 0)) @@ -208,10 +370,10 @@ func TestActiveSeries_Purge_NoMatchers(t *testing.T) { exp := len(series) - (ttl) // Purge is not intended to purge valid := c.Purge(mockedTime) - allActive, activeMatching := c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() assert.Equal(t, exp, allActive) assert.Nil(t, activeMatching) - assert.True(t, valid) }) } } @@ -240,7 +402,7 @@ func TestActiveSeries_Purge_WithMatchers(t *testing.T) { expMatchingSeries := 0 for i, s := range series { - c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0)) + c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1) // if this series is matching, and they're within the ttl if asm.matchers[0].Matches(s) && i >= ttl { @@ -253,10 +415,10 @@ func TestActiveSeries_Purge_WithMatchers(t *testing.T) { c.purge(time.Unix(int64(ttl), 0)) valid := c.Purge(mockedTime) - allActive, activeMatching := c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() assert.Equal(t, exp, allActive) assert.Equal(t, []int{expMatchingSeries}, activeMatching) - assert.True(t, valid) }) } } @@ -268,29 +430,29 @@ func TestActiveSeries_PurgeOpt(t *testing.T) { currentTime := time.Now() c := NewActiveSeries(&Matchers{}, 59*time.Second) - c.UpdateSeries(ls1, ref1, currentTime.Add(-2*time.Minute)) - c.UpdateSeries(ls2, ref2, currentTime) + c.UpdateSeries(ls1, ref1, currentTime.Add(-2*time.Minute), -1) + c.UpdateSeries(ls2, ref2, currentTime, -1) valid := c.Purge(currentTime) - allActive, _ := c.ActiveWithMatchers() - assert.Equal(t, 1, allActive) assert.True(t, valid) + allActive, _, _, _, _, _ := c.ActiveWithMatchers() + assert.Equal(t, 1, allActive) - c.UpdateSeries(ls1, ref1, currentTime.Add(-1*time.Minute)) - c.UpdateSeries(ls2, ref2, currentTime) + c.UpdateSeries(ls1, ref1, currentTime.Add(-1*time.Minute), -1) + c.UpdateSeries(ls2, ref2, currentTime, -1) valid = c.Purge(currentTime) - allActive, _ = c.ActiveWithMatchers() - assert.Equal(t, 1, allActive) assert.True(t, valid) + allActive, _, _, _, _, _ = c.ActiveWithMatchers() + assert.Equal(t, 1, allActive) // This will *not* update the series, since there is already newer timestamp. - c.UpdateSeries(ls2, ref2, currentTime.Add(-1*time.Minute)) + c.UpdateSeries(ls2, ref2, currentTime.Add(-1*time.Minute), -1) valid = c.Purge(currentTime) - allActive, _ = c.ActiveWithMatchers() - assert.Equal(t, 1, allActive) assert.True(t, valid) + allActive, _, _, _, _, _ = c.ActiveWithMatchers() + assert.Equal(t, 1, allActive) } func TestActiveSeries_ReloadSeriesMatchers(t *testing.T) { @@ -305,17 +467,17 @@ func TestActiveSeries_ReloadSeriesMatchers(t *testing.T) { c := NewActiveSeries(asm, DefaultTimeout) valid := c.Purge(currentTime) - allActive, activeMatching := c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() assert.Equal(t, 0, allActive) assert.Equal(t, []int{0}, activeMatching) - assert.True(t, valid) - c.UpdateSeries(ls1, ref1, currentTime) + c.UpdateSeries(ls1, ref1, currentTime, -1) valid = c.Purge(currentTime) - allActive, activeMatching = c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) assert.Equal(t, []int{1}, activeMatching) - assert.True(t, valid) c.ReloadMatchers(asm, currentTime) valid = c.Purge(currentTime) @@ -323,25 +485,25 @@ func TestActiveSeries_ReloadSeriesMatchers(t *testing.T) { // Adding timeout time to make Purge results valid. currentTime = currentTime.Add(DefaultTimeout) - c.UpdateSeries(ls1, ref1, currentTime) - c.UpdateSeries(ls2, ref2, currentTime) + c.UpdateSeries(ls1, ref1, currentTime, -1) + c.UpdateSeries(ls2, ref2, currentTime, -1) valid = c.Purge(currentTime) - allActive, activeMatching = c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 2, allActive) assert.Equal(t, []int{2}, activeMatching) - assert.True(t, valid) asmWithLessMatchers := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{})) c.ReloadMatchers(asmWithLessMatchers, currentTime) // Adding timeout time to make Purge results valid. currentTime = currentTime.Add(DefaultTimeout) - c.UpdateSeries(ls3, ref3, currentTime) + c.UpdateSeries(ls3, ref3, currentTime, -1) valid = c.Purge(currentTime) - allActive, activeMatching = c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) assert.Equal(t, []int(nil), activeMatching) - assert.True(t, valid) asmWithMoreMatchers := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{ "a": `{a="3"}`, @@ -351,12 +513,12 @@ func TestActiveSeries_ReloadSeriesMatchers(t *testing.T) { // Adding timeout time to make Purge results valid. currentTime = currentTime.Add(DefaultTimeout) - c.UpdateSeries(ls4, ref4, currentTime) + c.UpdateSeries(ls4, ref4, currentTime, -1) valid = c.Purge(currentTime) - allActive, activeMatching = c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) assert.Equal(t, []int{0, 1}, activeMatching) - assert.True(t, valid) } func TestActiveSeries_ReloadSeriesMatchers_LessMatchers(t *testing.T) { @@ -370,17 +532,17 @@ func TestActiveSeries_ReloadSeriesMatchers_LessMatchers(t *testing.T) { currentTime := time.Now() c := NewActiveSeries(asm, DefaultTimeout) valid := c.Purge(currentTime) - allActive, activeMatching := c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() assert.Equal(t, 0, allActive) assert.Equal(t, []int{0, 0}, activeMatching) - assert.True(t, valid) - c.UpdateSeries(ls1, ref1, currentTime) + c.UpdateSeries(ls1, ref1, currentTime, -1) valid = c.Purge(currentTime) - allActive, activeMatching = c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) assert.Equal(t, []int{1, 1}, activeMatching) - assert.True(t, valid) asm = NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{ "foo": `{a=~.+}`, @@ -391,10 +553,10 @@ func TestActiveSeries_ReloadSeriesMatchers_LessMatchers(t *testing.T) { // Adding timeout time to make Purge results valid. currentTime = currentTime.Add(DefaultTimeout) valid = c.Purge(currentTime) - allActive, activeMatching = c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 0, allActive) assert.Equal(t, []int{0}, activeMatching) - assert.True(t, valid) } func TestActiveSeries_ReloadSeriesMatchers_SameSizeNewLabels(t *testing.T) { @@ -409,17 +571,17 @@ func TestActiveSeries_ReloadSeriesMatchers_SameSizeNewLabels(t *testing.T) { c := NewActiveSeries(asm, DefaultTimeout) valid := c.Purge(currentTime) - allActive, activeMatching := c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() assert.Equal(t, 0, allActive) assert.Equal(t, []int{0, 0}, activeMatching) - assert.True(t, valid) - c.UpdateSeries(ls1, ref1, currentTime) + c.UpdateSeries(ls1, ref1, currentTime, -1) valid = c.Purge(currentTime) - allActive, activeMatching = c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) assert.Equal(t, []int{1, 1}, activeMatching) - assert.True(t, valid) asm = NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{ "foo": `{b=~.+}`, @@ -432,10 +594,10 @@ func TestActiveSeries_ReloadSeriesMatchers_SameSizeNewLabels(t *testing.T) { currentTime = currentTime.Add(DefaultTimeout) valid = c.Purge(currentTime) - allActive, activeMatching = c.ActiveWithMatchers() + assert.True(t, valid) + allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 0, allActive) assert.Equal(t, []int{0, 0}, activeMatching) - assert.True(t, valid) } var activeSeriesTestGoroutines = []int{50, 100, 500} @@ -468,7 +630,7 @@ func benchmarkActiveSeriesConcurrencySingleSeries(b *testing.B, goroutines int) for ix := 0; ix < max; ix++ { now = now.Add(time.Duration(ix) * time.Millisecond) - c.UpdateSeries(series, ref, now) + c.UpdateSeries(series, ref, now, -1) } }() } @@ -523,7 +685,7 @@ func BenchmarkActiveSeries_UpdateSeries(b *testing.B) { c := NewActiveSeries(&Matchers{}, DefaultTimeout) for round := 0; round <= tt.nRounds; round++ { for ix := 0; ix < tt.nSeries; ix++ { - c.UpdateSeries(series[ix], refs[ix], time.Unix(0, now)) + c.UpdateSeries(series[ix], refs[ix], time.Unix(0, now), -1) now++ } } @@ -560,30 +722,30 @@ func benchmarkPurge(b *testing.B, twice bool) { // Prepare series for ix, s := range series { if ix < numExpiresSeries { - c.UpdateSeries(s, refs[ix], currentTime.Add(-DefaultTimeout)) + c.UpdateSeries(s, refs[ix], currentTime.Add(-DefaultTimeout), -1) } else { - c.UpdateSeries(s, refs[ix], currentTime) + c.UpdateSeries(s, refs[ix], currentTime, -1) } } valid := c.Purge(currentTime) - allActive, _ := c.ActiveWithMatchers() - assert.Equal(b, numSeries, allActive) assert.True(b, valid) + allActive, _, _, _, _, _ := c.ActiveWithMatchers() + assert.Equal(b, numSeries, allActive) b.StartTimer() // Purge is going to purge everything currentTime = currentTime.Add(DefaultTimeout) valid = c.Purge(currentTime) - allActive, _ = c.ActiveWithMatchers() - assert.Equal(b, numSeries-numExpiresSeries, allActive) assert.True(b, valid) + allActive, _, _, _, _, _ = c.ActiveWithMatchers() + assert.Equal(b, numSeries-numExpiresSeries, allActive) if twice { valid = c.Purge(currentTime) - allActive, _ = c.ActiveWithMatchers() - assert.Equal(b, numSeries-numExpiresSeries, allActive) assert.True(b, valid) + allActive, _, _, _, _, _ = c.ActiveWithMatchers() + assert.Equal(b, numSeries-numExpiresSeries, allActive) } } } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7c39126107..f33aa7720d 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -547,13 +547,23 @@ func (i *Ingester) updateActiveSeries(now time.Time) { // Active series config has been reloaded, exposing loading metric until MetricsIdleTimeout passes. i.metrics.activeSeriesLoading.WithLabelValues(userID).Set(1) } else { - allActive, activeMatching := userDB.activeSeries.ActiveWithMatchers() + allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := userDB.activeSeries.ActiveWithMatchers() i.metrics.activeSeriesLoading.DeleteLabelValues(userID) if allActive > 0 { i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(allActive)) } else { i.metrics.activeSeriesPerUser.DeleteLabelValues(userID) } + if allActiveHistograms > 0 { + i.metrics.activeSeriesPerUserNativeHistograms.WithLabelValues(userID).Set(float64(allActiveHistograms)) + } else { + i.metrics.activeSeriesPerUserNativeHistograms.DeleteLabelValues(userID) + } + if allActiveBuckets > 0 { + i.metrics.activeNativeHistogramBucketsPerUser.WithLabelValues(userID).Set(float64(allActiveBuckets)) + } else { + i.metrics.activeNativeHistogramBucketsPerUser.DeleteLabelValues(userID) + } for idx, name := range userDB.activeSeries.CurrentMatcherNames() { // We only set the metrics for matchers that actually exist, to avoid increasing cardinality with zero valued metrics. @@ -562,6 +572,16 @@ func (i *Ingester) updateActiveSeries(now time.Time) { } else { i.metrics.activeSeriesCustomTrackersPerUser.DeleteLabelValues(userID, name) } + if activeMatchingHistograms[idx] > 0 { + i.metrics.activeSeriesCustomTrackersPerUserNativeHistograms.WithLabelValues(userID, name).Set(float64(activeMatchingHistograms[idx])) + } else { + i.metrics.activeSeriesCustomTrackersPerUserNativeHistograms.DeleteLabelValues(userID, name) + } + if activeMatchingBuckets[idx] > 0 { + i.metrics.activeNativeHistogramBucketsCustomTrackersPerUser.WithLabelValues(userID, name).Set(float64(activeMatchingBuckets[idx])) + } else { + i.metrics.activeNativeHistogramBucketsCustomTrackersPerUser.DeleteLabelValues(userID, name) + } } } } @@ -1006,6 +1026,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre return wrapWithUser(err, userID) } + numNativeHistogramBuckets := -1 if nativeHistogramsIngestionEnabled { for _, h := range ts.Histograms { var ( @@ -1045,10 +1066,24 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre return wrapWithUser(err, userID) } + numNativeHistograms := len(ts.Histograms) + if numNativeHistograms > 0 { + lastNativeHistogram := ts.Histograms[numNativeHistograms-1] + numFloats := len(ts.Samples) + if numFloats == 0 || ts.Samples[numFloats-1].TimestampMs < lastNativeHistogram.Timestamp { + numNativeHistogramBuckets = 0 + for _, span := range lastNativeHistogram.PositiveSpans { + numNativeHistogramBuckets += int(span.Length) + } + for _, span := range lastNativeHistogram.NegativeSpans { + numNativeHistogramBuckets += int(span.Length) + } + } + } } if activeSeries != nil && stats.succeededSamplesCount > oldSucceededSamplesCount { - activeSeries.UpdateSeries(nonCopiedLabels, uint64(ref), startAppend) + activeSeries.UpdateSeries(nonCopiedLabels, uint64(ref), startAppend, numNativeHistogramBuckets) } if len(ts.Exemplars) > 0 && i.limits.MaxGlobalExemplarsPerUser(userID) > 0 { @@ -1438,7 +1473,7 @@ func createUserStats(db *userTSDB, req *client.UserStatsRequest) (*client.UserSt case client.IN_MEMORY: series = db.Head().NumSeries() case client.ACTIVE: - activeSeries := db.activeSeries.Active() + activeSeries, _, _ := db.activeSeries.Active() series = uint64(activeSeries) default: return nil, fmt.Errorf("unknown count method %q", req.GetCountMethod()) @@ -2533,7 +2568,8 @@ func (i *Ingester) compactBlocksToReduceInMemorySeries(ctx context.Context, now // Estimate the number of series that would be dropped from the TSDB Head if we would // compact the head up until "now - active series idle timeout". - estimatedSeriesReduction := util_math.Max(0, int64(userMemorySeries)-int64(db.activeSeries.Active())) + totalActiveSeries, _, _ := db.activeSeries.Active() + estimatedSeriesReduction := util_math.Max(0, int64(userMemorySeries)-int64(totalActiveSeries)) estimations = append(estimations, seriesReductionEstimation{ userID: userID, estimatedCount: estimatedSeriesReduction, diff --git a/pkg/ingester/ingester_early_compaction_test.go b/pkg/ingester/ingester_early_compaction_test.go index 1e00e7a147..c715e84d77 100644 --- a/pkg/ingester/ingester_early_compaction_test.go +++ b/pkg/ingester/ingester_early_compaction_test.go @@ -134,7 +134,8 @@ func TestIngester_compactBlocksToReduceInMemorySeries_ShouldTriggerCompactionOnl // Pre-condition check. require.Equal(t, uint64(10), ingester.getTSDB(userID).Head().NumSeries()) - require.Equal(t, 0, ingester.getTSDB(userID).activeSeries.Active()) + totalActiveSeries, _, _ := ingester.getTSDB(userID).activeSeries.Active() + require.Equal(t, 0, totalActiveSeries) // Push 20 more series. for seriesID := 10; seriesID < 30; seriesID++ { @@ -148,7 +149,8 @@ func TestIngester_compactBlocksToReduceInMemorySeries_ShouldTriggerCompactionOnl require.Len(t, listBlocksInDir(t, userBlocksDir), 0) require.Equal(t, uint64(30), ingester.getTSDB(userID).Head().NumSeries()) - require.Equal(t, 20, ingester.getTSDB(userID).activeSeries.Active()) + totalActiveSeries, _, _ = ingester.getTSDB(userID).activeSeries.Active() + require.Equal(t, 20, totalActiveSeries) // Advance time until the last series are inactive too. Now we expect the early compaction to trigger. now = now.Add(30 * time.Minute) @@ -157,7 +159,8 @@ func TestIngester_compactBlocksToReduceInMemorySeries_ShouldTriggerCompactionOnl require.Len(t, listBlocksInDir(t, userBlocksDir), 1) require.Equal(t, uint64(0), ingester.getTSDB(userID).Head().NumSeries()) - require.Equal(t, 0, ingester.getTSDB(userID).activeSeries.Active()) + totalActiveSeries, _, _ = ingester.getTSDB(userID).activeSeries.Active() + require.Equal(t, 0, totalActiveSeries) } func TestIngester_compactBlocksToReduceInMemorySeries_ShouldCompactHeadUpUntilNowMinusActiveSeriesMetricsIdleTimeout(t *testing.T) { diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 0fbb92b69a..57cf42d85c 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -87,6 +87,8 @@ func TestIngester_Push(t *testing.T) { "cortex_ingester_memory_series_removed_total", "cortex_discarded_samples_total", "cortex_ingester_active_series", + "cortex_ingester_active_native_histogram_series", + "cortex_ingester_active_native_histogram_buckets", } userID := "test" @@ -237,6 +239,12 @@ func TestIngester_Push(t *testing.T) { # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge cortex_ingester_active_series{user="test"} 1 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test"} 1 + # HELP cortex_ingester_active_native_histogram_buckets Number of currently active native histogram buckets per user. + # TYPE cortex_ingester_active_native_histogram_buckets gauge + cortex_ingester_active_native_histogram_buckets{user="test"} 8 `, nativeHistograms: true, }, @@ -1005,6 +1013,7 @@ func TestIngester_Push(t *testing.T) { func TestIngester_Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *testing.T) { metricLabelAdapters := [][]mimirpb.LabelAdapter{{{Name: labels.MetricName, Value: "test"}}} + metricLabelAdaptersHist := [][]mimirpb.LabelAdapter{{{Name: labels.MetricName, Value: "test_histogram"}}} metricNames := []string{ "cortex_ingester_ingested_samples_total", "cortex_ingester_ingested_samples_failures_total", @@ -1013,6 +1022,8 @@ func TestIngester_Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *testi "cortex_ingester_memory_series_created_total", "cortex_ingester_memory_series_removed_total", "cortex_ingester_active_series", + "cortex_ingester_active_native_histogram_series", + "cortex_ingester_active_native_histogram_buckets", } registry := prometheus.NewRegistry() @@ -1047,6 +1058,10 @@ func TestIngester_Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *testi nil, mimirpb.API, ), + mimirpb.NewWriteRequest(nil, mimirpb.API).AddHistogramSeries(metricLabelAdaptersHist, + []mimirpb.Histogram{mimirpb.FromHistogramToHistogramProto(7, util_test.GenerateTestHistogram(1))}, nil), + mimirpb.NewWriteRequest(nil, mimirpb.API).AddHistogramSeries(metricLabelAdaptersHist, + []mimirpb.Histogram{mimirpb.FromHistogramToHistogramProto(8, util_test.GenerateTestGaugeHistogram(2))}, nil), } for _, req := range reqs { @@ -1063,8 +1078,8 @@ func TestIngester_Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *testi expectedMetrics := ` # HELP cortex_ingester_ingested_samples_total The total number of samples ingested per user. # TYPE cortex_ingester_ingested_samples_total counter - cortex_ingester_ingested_samples_total{user="test-1"} 2 - cortex_ingester_ingested_samples_total{user="test-2"} 2 + cortex_ingester_ingested_samples_total{user="test-1"} 4 + cortex_ingester_ingested_samples_total{user="test-2"} 4 # HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion per user. # TYPE cortex_ingester_ingested_samples_failures_total counter cortex_ingester_ingested_samples_failures_total{user="test-1"} 0 @@ -1074,19 +1089,27 @@ func TestIngester_Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *testi cortex_ingester_memory_users 2 # HELP cortex_ingester_memory_series The current number of series in memory. # TYPE cortex_ingester_memory_series gauge - cortex_ingester_memory_series 2 + cortex_ingester_memory_series 4 # HELP cortex_ingester_memory_series_created_total The total number of series that were created per user. # TYPE cortex_ingester_memory_series_created_total counter - cortex_ingester_memory_series_created_total{user="test-1"} 1 - cortex_ingester_memory_series_created_total{user="test-2"} 1 + cortex_ingester_memory_series_created_total{user="test-1"} 2 + cortex_ingester_memory_series_created_total{user="test-2"} 2 # HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user. # TYPE cortex_ingester_memory_series_removed_total counter cortex_ingester_memory_series_removed_total{user="test-1"} 0 cortex_ingester_memory_series_removed_total{user="test-2"} 0 # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge - cortex_ingester_active_series{user="test-1"} 1 - cortex_ingester_active_series{user="test-2"} 1 + cortex_ingester_active_series{user="test-1"} 2 + cortex_ingester_active_series{user="test-2"} 2 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test-1"} 1 + cortex_ingester_active_native_histogram_series{user="test-2"} 1 + # HELP cortex_ingester_active_native_histogram_buckets Number of currently active native histogram buckets per user. + # TYPE cortex_ingester_active_native_histogram_buckets gauge + cortex_ingester_active_native_histogram_buckets{user="test-1"} 8 + cortex_ingester_active_native_histogram_buckets{user="test-2"} 8 ` assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...)) @@ -1094,10 +1117,13 @@ func TestIngester_Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *testi func TestIngester_Push_DecreaseInactiveSeries(t *testing.T) { metricLabelAdapters := [][]mimirpb.LabelAdapter{{{Name: labels.MetricName, Value: "test"}}} + metricLabelAdaptersHist := [][]mimirpb.LabelAdapter{{{Name: labels.MetricName, Value: "test_histogram"}}} metricNames := []string{ "cortex_ingester_memory_series_created_total", "cortex_ingester_memory_series_removed_total", "cortex_ingester_active_series", + "cortex_ingester_active_native_histogram_series", + "cortex_ingester_active_native_histogram_buckets", } registry := prometheus.NewRegistry() @@ -1134,6 +1160,10 @@ func TestIngester_Push_DecreaseInactiveSeries(t *testing.T) { nil, mimirpb.API, ), + mimirpb.NewWriteRequest(nil, mimirpb.API).AddHistogramSeries(metricLabelAdaptersHist, + []mimirpb.Histogram{mimirpb.FromHistogramToHistogramProto(7, util_test.GenerateTestHistogram(1))}, nil), + mimirpb.NewWriteRequest(nil, mimirpb.API).AddHistogramSeries(metricLabelAdaptersHist, + []mimirpb.Histogram{mimirpb.FromHistogramToHistogramProto(8, util_test.GenerateTestGaugeHistogram(2))}, nil), } for _, req := range reqs { @@ -1152,8 +1182,8 @@ func TestIngester_Push_DecreaseInactiveSeries(t *testing.T) { expectedMetrics := ` # HELP cortex_ingester_memory_series_created_total The total number of series that were created per user. # TYPE cortex_ingester_memory_series_created_total counter - cortex_ingester_memory_series_created_total{user="test-1"} 1 - cortex_ingester_memory_series_created_total{user="test-2"} 1 + cortex_ingester_memory_series_created_total{user="test-1"} 2 + cortex_ingester_memory_series_created_total{user="test-2"} 2 # HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user. # TYPE cortex_ingester_memory_series_removed_total counter cortex_ingester_memory_series_removed_total{user="test-1"} 0 @@ -6236,6 +6266,12 @@ func TestIngesterActiveSeries(t *testing.T) { {{Name: labels.MetricName, Value: "test_metric"}, {Name: "bool", Value: "true"}, {Name: "team", Value: "a"}}, {{Name: labels.MetricName, Value: "test_metric"}, {Name: "bool", Value: "true"}, {Name: "team", Value: "b"}}, } + labelsToPushHist := [][]mimirpb.LabelAdapter{ + {{Name: labels.MetricName, Value: "test_histogram_metric"}, {Name: "bool", Value: "false"}, {Name: "team", Value: "a"}}, + {{Name: labels.MetricName, Value: "test_histogram_metric"}, {Name: "bool", Value: "false"}, {Name: "team", Value: "b"}}, + {{Name: labels.MetricName, Value: "test_histogram_metric"}, {Name: "bool", Value: "true"}, {Name: "team", Value: "a"}}, + {{Name: labels.MetricName, Value: "test_histogram_metric"}, {Name: "bool", Value: "true"}, {Name: "team", Value: "b"}}, + } req := func(lbls []mimirpb.LabelAdapter, t time.Time) *mimirpb.WriteRequest { return mimirpb.ToWriteRequest( @@ -6246,10 +6282,18 @@ func TestIngesterActiveSeries(t *testing.T) { mimirpb.API, ) } + reqHist := func(lbls []mimirpb.LabelAdapter, t time.Time) *mimirpb.WriteRequest { + return mimirpb.NewWriteRequest(nil, mimirpb.API).AddHistogramSeries([][]mimirpb.LabelAdapter{lbls}, + []mimirpb.Histogram{mimirpb.FromHistogramToHistogramProto(t.UnixMilli(), util_test.GenerateTestGaugeHistogram(1))}, nil) + } metricNames := []string{ "cortex_ingester_active_series", "cortex_ingester_active_series_custom_tracker", + "cortex_ingester_active_native_histogram_series", + "cortex_ingester_active_native_histogram_series_custom_tracker", + "cortex_ingester_active_native_histogram_buckets", + "cortex_ingester_active_native_histogram_buckets_custom_tracker", } userID := "test_user" userID2 := "other_test_user" @@ -6265,7 +6309,7 @@ func TestIngesterActiveSeries(t *testing.T) { }) activeSeriesTenantOverride := new(TenantLimitsMock) - activeSeriesTenantOverride.On("ByUserID", userID).Return(&validation.Limits{ActiveSeriesCustomTrackersConfig: activeSeriesTenantConfig}) + activeSeriesTenantOverride.On("ByUserID", userID).Return(&validation.Limits{ActiveSeriesCustomTrackersConfig: activeSeriesTenantConfig, NativeHistogramsIngestionEnabled: true}) activeSeriesTenantOverride.On("ByUserID", userID2).Return(nil) tests := map[string]struct { @@ -6278,6 +6322,8 @@ func TestIngesterActiveSeries(t *testing.T) { test: func(t *testing.T, ingester *Ingester, gatherer prometheus.Gatherer) { pushWithUser(t, ingester, labelsToPush, userID, req) pushWithUser(t, ingester, labelsToPush, userID2, req) + pushWithUser(t, ingester, labelsToPushHist, userID, reqHist) + pushWithUser(t, ingester, labelsToPushHist, userID2, reqHist) // Update active series for metrics check. ingester.updateActiveSeries(time.Now()) @@ -6285,14 +6331,34 @@ func TestIngesterActiveSeries(t *testing.T) { expectedMetrics := ` # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge - cortex_ingester_active_series{user="other_test_user"} 4 - cortex_ingester_active_series{user="test_user"} 4 + cortex_ingester_active_series{user="other_test_user"} 8 + cortex_ingester_active_series{user="test_user"} 8 # HELP cortex_ingester_active_series_custom_tracker Number of currently active series matching a pre-configured label matchers per user. # TYPE cortex_ingester_active_series_custom_tracker gauge - cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 + cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="other_test_user"} 4 + cortex_ingester_active_native_histogram_series{user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series_custom_tracker Number of currently active native histogram series matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_series_custom_tracker gauge + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_a",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_b",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 + # HELP cortex_ingester_active_native_histogram_buckets Number of currently active native histogram buckets per user. + # TYPE cortex_ingester_active_native_histogram_buckets gauge + cortex_ingester_active_native_histogram_buckets{user="other_test_user"} 32 + cortex_ingester_active_native_histogram_buckets{user="test_user"} 32 + # HELP cortex_ingester_active_native_histogram_buckets_custom_tracker Number of currently active native histogram buckets matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_buckets_custom_tracker gauge + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_a",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_b",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 16 ` // Check tracked Prometheus metrics @@ -6303,6 +6369,8 @@ func TestIngesterActiveSeries(t *testing.T) { test: func(t *testing.T, ingester *Ingester, gatherer prometheus.Gatherer) { pushWithUser(t, ingester, labelsToPush, userID, req) pushWithUser(t, ingester, labelsToPush, userID2, req) + pushWithUser(t, ingester, labelsToPushHist, userID, reqHist) + pushWithUser(t, ingester, labelsToPushHist, userID2, reqHist) // Update active series for metrics check. ingester.updateActiveSeries(time.Now()) @@ -6310,14 +6378,34 @@ func TestIngesterActiveSeries(t *testing.T) { expectedMetrics := ` # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge - cortex_ingester_active_series{user="other_test_user"} 4 - cortex_ingester_active_series{user="test_user"} 4 + cortex_ingester_active_series{user="other_test_user"} 8 + cortex_ingester_active_series{user="test_user"} 8 # HELP cortex_ingester_active_series_custom_tracker Number of currently active series matching a pre-configured label matchers per user. # TYPE cortex_ingester_active_series_custom_tracker gauge - cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 + cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="other_test_user"} 4 + cortex_ingester_active_native_histogram_series{user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series_custom_tracker Number of currently active native histogram series matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_series_custom_tracker gauge + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_a",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_b",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 + # HELP cortex_ingester_active_native_histogram_buckets Number of currently active native histogram buckets per user. + # TYPE cortex_ingester_active_native_histogram_buckets gauge + cortex_ingester_active_native_histogram_buckets{user="other_test_user"} 32 + cortex_ingester_active_native_histogram_buckets{user="test_user"} 32 + # HELP cortex_ingester_active_native_histogram_buckets_custom_tracker Number of currently active native histogram buckets matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_buckets_custom_tracker gauge + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_a",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_b",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 16 ` // Check tracked Prometheus metrics @@ -6333,6 +6421,8 @@ func TestIngesterActiveSeries(t *testing.T) { currentTime := time.Now() pushWithUser(t, ingester, labelsToPush, userID, req) pushWithUser(t, ingester, labelsToPush, userID2, req) + pushWithUser(t, ingester, labelsToPushHist, userID, reqHist) + pushWithUser(t, ingester, labelsToPushHist, userID2, reqHist) // Update active series for metrics check. ingester.updateActiveSeries(currentTime) @@ -6340,14 +6430,34 @@ func TestIngesterActiveSeries(t *testing.T) { expectedMetrics := ` # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge - cortex_ingester_active_series{user="other_test_user"} 4 - cortex_ingester_active_series{user="test_user"} 4 + cortex_ingester_active_series{user="other_test_user"} 8 + cortex_ingester_active_series{user="test_user"} 8 # HELP cortex_ingester_active_series_custom_tracker Number of currently active series matching a pre-configured label matchers per user. # TYPE cortex_ingester_active_series_custom_tracker gauge - cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 + cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="other_test_user"} 4 + cortex_ingester_active_native_histogram_series{user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series_custom_tracker Number of currently active native histogram series matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_series_custom_tracker gauge + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_a",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_b",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 + # HELP cortex_ingester_active_native_histogram_buckets Number of currently active native histogram buckets per user. + # TYPE cortex_ingester_active_native_histogram_buckets gauge + cortex_ingester_active_native_histogram_buckets{user="other_test_user"} 32 + cortex_ingester_active_native_histogram_buckets{user="test_user"} 32 + # HELP cortex_ingester_active_native_histogram_buckets_custom_tracker Number of currently active native histogram buckets matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_buckets_custom_tracker gauge + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_a",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_b",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 16 ` // Check tracked Prometheus metrics @@ -6356,6 +6466,7 @@ func TestIngesterActiveSeries(t *testing.T) { // Pushing second time to have entires which are not going to be purged currentTime = time.Now() pushWithUser(t, ingester, labelsToPush, userID, req) + pushWithUser(t, ingester, labelsToPushHist, userID, reqHist) // Adding time to make the first batch of pushes idle. // We update them in the exact moment in time where append time of the first push is already considered idle, @@ -6366,11 +6477,25 @@ func TestIngesterActiveSeries(t *testing.T) { expectedMetrics = ` # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge - cortex_ingester_active_series{user="test_user"} 4 + cortex_ingester_active_series{user="test_user"} 8 # HELP cortex_ingester_active_series_custom_tracker Number of currently active series matching a pre-configured label matchers per user. # TYPE cortex_ingester_active_series_custom_tracker gauge - cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 2 + cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series_custom_tracker Number of currently active native histogram series matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_series_custom_tracker gauge + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_a",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_b",user="test_user"} 2 + # HELP cortex_ingester_active_native_histogram_buckets Number of currently active native histogram buckets per user. + # TYPE cortex_ingester_active_native_histogram_buckets gauge + cortex_ingester_active_native_histogram_buckets{user="test_user"} 32 + # HELP cortex_ingester_active_native_histogram_buckets_custom_tracker Number of currently active native histogram buckets matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_buckets_custom_tracker gauge + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_a",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_b",user="test_user"} 16 ` // Check tracked Prometheus metrics @@ -6387,6 +6512,8 @@ func TestIngesterActiveSeries(t *testing.T) { test: func(t *testing.T, ingester *Ingester, gatherer prometheus.Gatherer) { pushWithUser(t, ingester, labelsToPush, userID, req) pushWithUser(t, ingester, labelsToPush, userID2, req) + pushWithUser(t, ingester, labelsToPushHist, userID, reqHist) + pushWithUser(t, ingester, labelsToPushHist, userID2, reqHist) // Update active series for metrics check. ingester.updateActiveSeries(time.Now()) @@ -6409,6 +6536,7 @@ func TestIngesterActiveSeries(t *testing.T) { limits := defaultLimitsTestConfig() limits.ActiveSeriesCustomTrackersConfig = activeSeriesDefaultConfig + limits.NativeHistogramsIngestionEnabled = true overrides, err := validation.NewOverrides(limits, activeSeriesTenantOverride) require.NoError(t, err) @@ -6434,6 +6562,12 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { {{Name: labels.MetricName, Value: "test_metric"}, {Name: "bool", Value: "true"}, {Name: "team", Value: "a"}}, {{Name: labels.MetricName, Value: "test_metric"}, {Name: "bool", Value: "true"}, {Name: "team", Value: "b"}}, } + labelsToPushHist := [][]mimirpb.LabelAdapter{ + {{Name: labels.MetricName, Value: "test_histogram_metric"}, {Name: "bool", Value: "false"}, {Name: "team", Value: "a"}}, + {{Name: labels.MetricName, Value: "test_histogram_metric"}, {Name: "bool", Value: "false"}, {Name: "team", Value: "b"}}, + {{Name: labels.MetricName, Value: "test_histogram_metric"}, {Name: "bool", Value: "true"}, {Name: "team", Value: "a"}}, + {{Name: labels.MetricName, Value: "test_histogram_metric"}, {Name: "bool", Value: "true"}, {Name: "team", Value: "b"}}, + } req := func(lbls []mimirpb.LabelAdapter, t time.Time) *mimirpb.WriteRequest { return mimirpb.ToWriteRequest( @@ -6444,11 +6578,19 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { mimirpb.API, ) } + reqHist := func(lbls []mimirpb.LabelAdapter, t time.Time) *mimirpb.WriteRequest { + return mimirpb.NewWriteRequest(nil, mimirpb.API).AddHistogramSeries([][]mimirpb.LabelAdapter{lbls}, + []mimirpb.Histogram{mimirpb.FromHistogramToHistogramProto(t.UnixMilli(), util_test.GenerateTestGaugeHistogram(1))}, nil) + } metricNames := []string{ "cortex_ingester_active_series_loading", "cortex_ingester_active_series", "cortex_ingester_active_series_custom_tracker", + "cortex_ingester_active_native_histogram_series", + "cortex_ingester_active_native_histogram_series_custom_tracker", + "cortex_ingester_active_native_histogram_buckets", + "cortex_ingester_active_native_histogram_buckets_custom_tracker", } userID := "test_user" userID2 := "other_test_user" @@ -6465,7 +6607,7 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { defaultActiveSeriesTenantOverride := new(TenantLimitsMock) defaultActiveSeriesTenantOverride.On("ByUserID", userID2).Return(nil) - defaultActiveSeriesTenantOverride.On("ByUserID", userID).Return(&validation.Limits{ActiveSeriesCustomTrackersConfig: activeSeriesTenantConfig}) + defaultActiveSeriesTenantOverride.On("ByUserID", userID).Return(&validation.Limits{ActiveSeriesCustomTrackersConfig: activeSeriesTenantConfig, NativeHistogramsIngestionEnabled: true}) tests := map[string]struct { test func(t *testing.T, ingester *Ingester, gatherer prometheus.Gatherer) @@ -6482,6 +6624,8 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { pushWithUser(t, ingester, labelsToPush, userID, req) pushWithUser(t, ingester, labelsToPush, userID2, req) + pushWithUser(t, ingester, labelsToPushHist, userID, reqHist) + pushWithUser(t, ingester, labelsToPushHist, userID2, reqHist) // Update active series for metrics check. ingester.updateActiveSeries(currentTime) @@ -6489,14 +6633,34 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { expectedMetrics := ` # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge - cortex_ingester_active_series{user="other_test_user"} 4 - cortex_ingester_active_series{user="test_user"} 4 + cortex_ingester_active_series{user="other_test_user"} 8 + cortex_ingester_active_series{user="test_user"} 8 # HELP cortex_ingester_active_series_custom_tracker Number of currently active series matching a pre-configured label matchers per user. # TYPE cortex_ingester_active_series_custom_tracker gauge - cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="test_user"} 2 + cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="other_test_user"} 4 + cortex_ingester_active_native_histogram_series{user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series_custom_tracker Number of currently active native histogram series matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_series_custom_tracker gauge + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_false_flagbased",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_true_flagbased",user="test_user"} 2 + # HELP cortex_ingester_active_native_histogram_buckets Number of currently active native histogram buckets per user. + # TYPE cortex_ingester_active_native_histogram_buckets gauge + cortex_ingester_active_native_histogram_buckets{user="other_test_user"} 32 + cortex_ingester_active_native_histogram_buckets{user="test_user"} 32 + # HELP cortex_ingester_active_native_histogram_buckets_custom_tracker Number of currently active native histogram buckets matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_buckets_custom_tracker gauge + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_false_flagbased",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_true_flagbased",user="test_user"} 16 ` // Check tracked Prometheus metrics require.NoError(t, testutil.GatherAndCompare(gatherer, strings.NewReader(expectedMetrics), metricNames...)) @@ -6504,9 +6668,10 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { // Add new runtime configs activeSeriesTenantOverride := new(TenantLimitsMock) activeSeriesTenantOverride.On("ByUserID", userID2).Return(nil) - activeSeriesTenantOverride.On("ByUserID", userID).Return(&validation.Limits{ActiveSeriesCustomTrackersConfig: activeSeriesTenantConfig}) + activeSeriesTenantOverride.On("ByUserID", userID).Return(&validation.Limits{ActiveSeriesCustomTrackersConfig: activeSeriesTenantConfig, NativeHistogramsIngestionEnabled: true}) limits := defaultLimitsTestConfig() limits.ActiveSeriesCustomTrackersConfig = activeSeriesDefaultConfig + limits.NativeHistogramsIngestionEnabled = true override, err := validation.NewOverrides(limits, activeSeriesTenantOverride) require.NoError(t, err) ingester.limits = override @@ -6516,14 +6681,28 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { expectedMetrics = ` # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge - cortex_ingester_active_series{user="other_test_user"} 4 + cortex_ingester_active_series{user="other_test_user"} 8 # HELP cortex_ingester_active_series_custom_tracker Number of currently active series matching a pre-configured label matchers per user. # TYPE cortex_ingester_active_series_custom_tracker gauge - cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 + cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 4 # HELP cortex_ingester_active_series_loading Indicates that active series configuration is being reloaded, and waiting to become stable. While this metric is non zero, values from active series metrics shouldn't be considered. # TYPE cortex_ingester_active_series_loading gauge cortex_ingester_active_series_loading{user="test_user"} 1 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="other_test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series_custom_tracker Number of currently active native histogram series matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_series_custom_tracker gauge + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 + # HELP cortex_ingester_active_native_histogram_buckets Number of currently active native histogram buckets per user. + # TYPE cortex_ingester_active_native_histogram_buckets gauge + cortex_ingester_active_native_histogram_buckets{user="other_test_user"} 32 + # HELP cortex_ingester_active_native_histogram_buckets_custom_tracker Number of currently active native histogram buckets matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_buckets_custom_tracker gauge + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 16 ` require.NoError(t, testutil.GatherAndCompare(gatherer, strings.NewReader(expectedMetrics), metricNames...)) @@ -6531,20 +6710,42 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { currentTime = time.Now() pushWithUser(t, ingester, labelsToPush, userID, req) pushWithUser(t, ingester, labelsToPush, userID2, req) + pushWithUser(t, ingester, labelsToPushHist, userID, reqHist) + pushWithUser(t, ingester, labelsToPushHist, userID2, reqHist) // Adding idleTimeout to expose the metrics but not purge the pushes. currentTime = currentTime.Add(ingester.cfg.ActiveSeriesMetrics.IdleTimeout) ingester.updateActiveSeries(currentTime) expectedMetrics = ` # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge - cortex_ingester_active_series{user="other_test_user"} 4 - cortex_ingester_active_series{user="test_user"} 4 + cortex_ingester_active_series{user="other_test_user"} 8 + cortex_ingester_active_series{user="test_user"} 8 # HELP cortex_ingester_active_series_custom_tracker Number of currently active series matching a pre-configured label matchers per user. # TYPE cortex_ingester_active_series_custom_tracker gauge - cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 2 + cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="other_test_user"} 4 + cortex_ingester_active_native_histogram_series{user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series_custom_tracker Number of currently active native histogram series matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_series_custom_tracker gauge + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_a",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_b",user="test_user"} 2 + # HELP cortex_ingester_active_native_histogram_buckets Number of currently active native histogram buckets per user. + # TYPE cortex_ingester_active_native_histogram_buckets gauge + cortex_ingester_active_native_histogram_buckets{user="other_test_user"} 32 + cortex_ingester_active_native_histogram_buckets{user="test_user"} 32 + # HELP cortex_ingester_active_native_histogram_buckets_custom_tracker Number of currently active native histogram buckets matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_buckets_custom_tracker gauge + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_a",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_b",user="test_user"} 16 ` require.NoError(t, testutil.GatherAndCompare(gatherer, strings.NewReader(expectedMetrics), metricNames...)) }, @@ -6557,6 +6758,8 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { pushWithUser(t, ingester, labelsToPush, userID, req) pushWithUser(t, ingester, labelsToPush, userID2, req) + pushWithUser(t, ingester, labelsToPushHist, userID, reqHist) + pushWithUser(t, ingester, labelsToPushHist, userID2, reqHist) // Update active series for metrics check. ingester.updateActiveSeries(currentTime) @@ -6564,14 +6767,34 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { expectedMetrics := ` # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge - cortex_ingester_active_series{user="other_test_user"} 4 - cortex_ingester_active_series{user="test_user"} 4 + cortex_ingester_active_series{user="other_test_user"} 8 + cortex_ingester_active_series{user="test_user"} 8 # HELP cortex_ingester_active_series_custom_tracker Number of currently active series matching a pre-configured label matchers per user. # TYPE cortex_ingester_active_series_custom_tracker gauge - cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 2 + cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="other_test_user"} 4 + cortex_ingester_active_native_histogram_series{user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series_custom_tracker Number of currently active native histogram series matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_series_custom_tracker gauge + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_a",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_b",user="test_user"} 2 + # HELP cortex_ingester_active_native_histogram_buckets Number of currently active native histogram buckets per user. + # TYPE cortex_ingester_active_native_histogram_buckets gauge + cortex_ingester_active_native_histogram_buckets{user="other_test_user"} 32 + cortex_ingester_active_native_histogram_buckets{user="test_user"} 32 + # HELP cortex_ingester_active_native_histogram_buckets_custom_tracker Number of currently active native histogram buckets matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_buckets_custom_tracker gauge + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_a",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_b",user="test_user"} 16 ` // Check tracked Prometheus metrics require.NoError(t, testutil.GatherAndCompare(gatherer, strings.NewReader(expectedMetrics), metricNames...)) @@ -6579,6 +6802,7 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { // Remove runtime configs limits := defaultLimitsTestConfig() limits.ActiveSeriesCustomTrackersConfig = activeSeriesDefaultConfig + limits.NativeHistogramsIngestionEnabled = true override, err := validation.NewOverrides(limits, nil) require.NoError(t, err) ingester.limits = override @@ -6586,11 +6810,25 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { expectedMetrics = ` # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge - cortex_ingester_active_series{user="other_test_user"} 4 + cortex_ingester_active_series{user="other_test_user"} 8 # HELP cortex_ingester_active_series_custom_tracker Number of currently active series matching a pre-configured label matchers per user. # TYPE cortex_ingester_active_series_custom_tracker gauge - cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 + cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="other_test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series_custom_tracker Number of currently active native histogram series matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_series_custom_tracker gauge + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 + # HELP cortex_ingester_active_native_histogram_buckets Number of currently active native histogram buckets per user. + # TYPE cortex_ingester_active_native_histogram_buckets gauge + cortex_ingester_active_native_histogram_buckets{user="other_test_user"} 32 + # HELP cortex_ingester_active_native_histogram_buckets_custom_tracker Number of currently active native histogram buckets matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_buckets_custom_tracker gauge + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 16 # HELP cortex_ingester_active_series_loading Indicates that active series configuration is being reloaded, and waiting to become stable. While this metric is non zero, values from active series metrics shouldn't be considered. # TYPE cortex_ingester_active_series_loading gauge cortex_ingester_active_series_loading{user="test_user"} 1 @@ -6601,20 +6839,42 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { currentTime = time.Now() pushWithUser(t, ingester, labelsToPush, userID, req) pushWithUser(t, ingester, labelsToPush, userID2, req) + pushWithUser(t, ingester, labelsToPushHist, userID, reqHist) + pushWithUser(t, ingester, labelsToPushHist, userID2, reqHist) // Adding idleTimeout to expose the metrics but not purge the pushes. currentTime = currentTime.Add(ingester.cfg.ActiveSeriesMetrics.IdleTimeout) ingester.updateActiveSeries(currentTime) expectedMetrics = ` # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge - cortex_ingester_active_series{user="other_test_user"} 4 - cortex_ingester_active_series{user="test_user"} 4 + cortex_ingester_active_series{user="other_test_user"} 8 + cortex_ingester_active_series{user="test_user"} 8 # HELP cortex_ingester_active_series_custom_tracker Number of currently active series matching a pre-configured label matchers per user. # TYPE cortex_ingester_active_series_custom_tracker gauge - cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="test_user"} 2 + cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="other_test_user"} 4 + cortex_ingester_active_native_histogram_series{user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series_custom_tracker Number of currently active native histogram series matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_series_custom_tracker gauge + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_false_flagbased",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_true_flagbased",user="test_user"} 2 + # HELP cortex_ingester_active_native_histogram_buckets Number of currently active native histogram buckets per user. + # TYPE cortex_ingester_active_native_histogram_buckets gauge + cortex_ingester_active_native_histogram_buckets{user="other_test_user"} 32 + cortex_ingester_active_native_histogram_buckets{user="test_user"} 32 + # HELP cortex_ingester_active_native_histogram_buckets_custom_tracker Number of currently active native histogram buckets matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_buckets_custom_tracker gauge + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_false_flagbased",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_true_flagbased",user="test_user"} 16 ` require.NoError(t, testutil.GatherAndCompare(gatherer, strings.NewReader(expectedMetrics), metricNames...)) }, @@ -6625,6 +6885,7 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { currentTime := time.Now() pushWithUser(t, ingester, labelsToPush, userID, req) + pushWithUser(t, ingester, labelsToPushHist, userID, reqHist) // Update active series for metrics check. ingester.updateActiveSeries(currentTime) @@ -6632,11 +6893,25 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { expectedMetrics := ` # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge - cortex_ingester_active_series{user="test_user"} 4 + cortex_ingester_active_series{user="test_user"} 8 # HELP cortex_ingester_active_series_custom_tracker Number of currently active series matching a pre-configured label matchers per user. # TYPE cortex_ingester_active_series_custom_tracker gauge - cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="test_user"} 2 + cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series_custom_tracker Number of currently active native histogram series matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_series_custom_tracker gauge + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_false_flagbased",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_true_flagbased",user="test_user"} 2 + # HELP cortex_ingester_active_native_histogram_buckets Number of currently active native histogram buckets per user. + # TYPE cortex_ingester_active_native_histogram_buckets gauge + cortex_ingester_active_native_histogram_buckets{user="test_user"} 32 + # HELP cortex_ingester_active_native_histogram_buckets_custom_tracker Number of currently active native histogram buckets matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_buckets_custom_tracker gauge + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_false_flagbased",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_true_flagbased",user="test_user"} 16 ` // Check tracked Prometheus metrics require.NoError(t, testutil.GatherAndCompare(gatherer, strings.NewReader(expectedMetrics), metricNames...)) @@ -6648,9 +6923,10 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { "team_b": `{team="b"}`, "team_c": `{team="b"}`, "team_d": `{team="b"}`, - })}) + }), NativeHistogramsIngestionEnabled: true}) limits := defaultLimitsTestConfig() limits.ActiveSeriesCustomTrackersConfig = activeSeriesDefaultConfig + limits.NativeHistogramsIngestionEnabled = true override, err := validation.NewOverrides(limits, activeSeriesTenantOverride) require.NoError(t, err) ingester.limits = override @@ -6665,19 +6941,39 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { // Saving time before second push to avoid purging it before exposing. currentTime = time.Now() pushWithUser(t, ingester, labelsToPush, userID, req) + pushWithUser(t, ingester, labelsToPushHist, userID, reqHist) // Adding idleTimeout to expose the metrics but not purge the pushes. currentTime = currentTime.Add(ingester.cfg.ActiveSeriesMetrics.IdleTimeout) ingester.updateActiveSeries(currentTime) expectedMetrics = ` # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge - cortex_ingester_active_series{user="test_user"} 4 + cortex_ingester_active_series{user="test_user"} 8 # HELP cortex_ingester_active_series_custom_tracker Number of currently active series matching a pre-configured label matchers per user. # TYPE cortex_ingester_active_series_custom_tracker gauge - cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="team_c",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="team_d",user="test_user"} 2 + cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="team_c",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="team_d",user="test_user"} 4 + + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series_custom_tracker Number of currently active native histogram series matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_series_custom_tracker gauge + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_a",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_b",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_c",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_d",user="test_user"} 2 + # HELP cortex_ingester_active_native_histogram_buckets Number of currently active native histogram buckets per user. + # TYPE cortex_ingester_active_native_histogram_buckets gauge + cortex_ingester_active_native_histogram_buckets{user="test_user"} 32 + # HELP cortex_ingester_active_native_histogram_buckets_custom_tracker Number of currently active native histogram buckets matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_buckets_custom_tracker gauge + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_a",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_b",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_c",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_d",user="test_user"} 16 ` require.NoError(t, testutil.GatherAndCompare(gatherer, strings.NewReader(expectedMetrics), metricNames...)) }, @@ -6690,6 +6986,8 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { pushWithUser(t, ingester, labelsToPush, userID, req) pushWithUser(t, ingester, labelsToPush, userID2, req) + pushWithUser(t, ingester, labelsToPushHist, userID, reqHist) + pushWithUser(t, ingester, labelsToPushHist, userID2, reqHist) // Update active series for metrics check. ingester.updateActiveSeries(currentTime) @@ -6697,14 +6995,34 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { expectedMetrics := ` # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge - cortex_ingester_active_series{user="other_test_user"} 4 - cortex_ingester_active_series{user="test_user"} 4 + cortex_ingester_active_series{user="other_test_user"} 8 + cortex_ingester_active_series{user="test_user"} 8 # HELP cortex_ingester_active_series_custom_tracker Number of currently active series matching a pre-configured label matchers per user. # TYPE cortex_ingester_active_series_custom_tracker gauge - cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 2 - cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 2 + cortex_ingester_active_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="team_a",user="test_user"} 4 + cortex_ingester_active_series_custom_tracker{name="team_b",user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="other_test_user"} 4 + cortex_ingester_active_native_histogram_series{user="test_user"} 4 + # HELP cortex_ingester_active_native_histogram_series_custom_tracker Number of currently active native histogram series matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_series_custom_tracker gauge + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_a",user="test_user"} 2 + cortex_ingester_active_native_histogram_series_custom_tracker{name="team_b",user="test_user"} 2 + # HELP cortex_ingester_active_native_histogram_buckets Number of currently active native histogram buckets per user. + # TYPE cortex_ingester_active_native_histogram_buckets gauge + cortex_ingester_active_native_histogram_buckets{user="other_test_user"} 32 + cortex_ingester_active_native_histogram_buckets{user="test_user"} 32 + # HELP cortex_ingester_active_native_histogram_buckets_custom_tracker Number of currently active native histogram buckets matching a pre-configured label matchers per user. + # TYPE cortex_ingester_active_native_histogram_buckets_custom_tracker gauge + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_true_flagbased",user="other_test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="bool_is_false_flagbased",user="other_test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_a",user="test_user"} 16 + cortex_ingester_active_native_histogram_buckets_custom_tracker{name="team_b",user="test_user"} 16 ` // Check tracked Prometheus metrics require.NoError(t, testutil.GatherAndCompare(gatherer, strings.NewReader(expectedMetrics), metricNames...)) @@ -6740,6 +7058,7 @@ func TestIngesterActiveSeriesConfigChanges(t *testing.T) { limits := defaultLimitsTestConfig() limits.ActiveSeriesCustomTrackersConfig = testData.activeSeriesConfig + limits.NativeHistogramsIngestionEnabled = true var overrides *validation.Overrides var err error // Without this, TenantLimitsMock(nil) != nil when using getOverridesForUser in limits.go diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index fbbbbd2691..370567b078 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -34,9 +34,13 @@ type ingesterMetrics struct { memMetadataCreatedTotal *prometheus.CounterVec memMetadataRemovedTotal *prometheus.CounterVec - activeSeriesLoading *prometheus.GaugeVec - activeSeriesPerUser *prometheus.GaugeVec - activeSeriesCustomTrackersPerUser *prometheus.GaugeVec + activeSeriesLoading *prometheus.GaugeVec + activeSeriesPerUser *prometheus.GaugeVec + activeSeriesCustomTrackersPerUser *prometheus.GaugeVec + activeSeriesPerUserNativeHistograms *prometheus.GaugeVec + activeSeriesCustomTrackersPerUserNativeHistograms *prometheus.GaugeVec + activeNativeHistogramBucketsPerUser *prometheus.GaugeVec + activeNativeHistogramBucketsCustomTrackersPerUser *prometheus.GaugeVec // Global limit metrics maxUsersGauge prometheus.GaugeFunc @@ -254,6 +258,30 @@ func newIngesterMetrics( Help: "Number of currently active series matching a pre-configured label matchers per user.", }, []string{"user", "name"}), + // Not registered automatically, but only if activeSeriesEnabled is true. + activeSeriesPerUserNativeHistograms: promauto.With(activeSeriesReg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_active_native_histogram_series", + Help: "Number of currently active native histogram series per user.", + }, []string{"user"}), + + // Not registered automatically, but only if activeSeriesEnabled is true. + activeSeriesCustomTrackersPerUserNativeHistograms: promauto.With(activeSeriesReg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_active_native_histogram_series_custom_tracker", + Help: "Number of currently active native histogram series matching a pre-configured label matchers per user.", + }, []string{"user", "name"}), + + // Not registered automatically, but only if activeSeriesEnabled is true. + activeNativeHistogramBucketsPerUser: promauto.With(activeSeriesReg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_active_native_histogram_buckets", + Help: "Number of currently active native histogram buckets per user.", + }, []string{"user"}), + + // Not registered automatically, but only if activeSeriesEnabled is true. + activeNativeHistogramBucketsCustomTrackersPerUser: promauto.With(activeSeriesReg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_active_native_histogram_buckets_custom_tracker", + Help: "Number of currently active native histogram buckets matching a pre-configured label matchers per user.", + }, []string{"user", "name"}), + compactionsTriggered: promauto.With(r).NewCounter(prometheus.CounterOpts{ Name: "cortex_ingester_tsdb_compactions_triggered_total", Help: "Total number of triggered compactions.", @@ -315,8 +343,12 @@ func (m *ingesterMetrics) deletePerGroupMetricsForUser(userID, group string) { func (m *ingesterMetrics) deletePerUserCustomTrackerMetrics(userID string, customTrackerMetrics []string) { m.activeSeriesLoading.DeleteLabelValues(userID) m.activeSeriesPerUser.DeleteLabelValues(userID) + m.activeSeriesPerUserNativeHistograms.DeleteLabelValues(userID) + m.activeNativeHistogramBucketsPerUser.DeleteLabelValues(userID) for _, name := range customTrackerMetrics { m.activeSeriesCustomTrackersPerUser.DeleteLabelValues(userID, name) + m.activeSeriesCustomTrackersPerUserNativeHistograms.DeleteLabelValues(userID, name) + m.activeNativeHistogramBucketsCustomTrackersPerUser.DeleteLabelValues(userID, name) } }