Skip to content

Commit

Permalink
Add histogram billing metrics to ingester (#5318)
Browse files Browse the repository at this point in the history
* Add new metrics to ingester for native histograms billing

* Track native histogram buckets as new metrics in ingester for billing

* Update changes to active native histogram series

* Move valid assertions closer to purge according to review

* Update according to code review

* Update CHANGELOG

* Combine hasNativeHistograms and numNativeHistogramBuckets

* Fix conflicts with new code

* Apply suggestions from code review

Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>

---------

Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
  • Loading branch information
zenador and krajorama authored Jul 11, 2023
1 parent 0a1382f commit a9f4bdf
Show file tree
Hide file tree
Showing 8 changed files with 869 additions and 227 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [FEATURE] Ruler: Support filtering results from rule status endpoint by `file`, `rule_group` and `rule_name`. #5291
* [FEATURE] Ingester: add experimental support for creating tokens by using spread minimizing strategy. This can be enabled with `-ingester.ring.token-generation-strategy: spread-minimizing` and `-ingester.ring.spread-minimizing-zones: <all available zones>`. In that case `-ingester.ring.tokens-file-path` must be empty. #5308 #5324
* [FEATURE] Ingester: add experimental support to compact the TSDB Head when the number of in-memory series is equal or greater than `-blocks-storage.tsdb.early-head-compaction-min-in-memory-series`, and the ingester estimates that the per-tenant TSDB Head compaction will reduce in-memory series by at least `-blocks-storage.tsdb.early-head-compaction-min-estimated-series-reduction-percentage`. #5371
* [FEATURE] Ingester: add new metrics for tracking native histograms in active series: `cortex_ingester_active_native_histogram_series`, `cortex_ingester_active_native_histogram_series_custom_tracker`, `cortex_ingester_active_native_histogram_buckets`, `cortex_ingester_active_native_histogram_buckets_custom_tracker`. The first 2 are the subsets of the existing and unmodified `cortex_ingester_active_series` and `cortex_ingester_active_series_custom_tracker` respectively, only tracking native histogram series, and the last 2 are the equivalents for tracking the number of buckets in native histogram series. #5318
* [ENHANCEMENT] Overrides-exporter: Add new metrics for write path and alertmanager (`max_global_metadata_per_user`, `max_global_metadata_per_metric`, `request_rate`, `request_burst_size`, `alertmanager_notification_rate_limit`, `alertmanager_max_dispatcher_aggregation_groups`, `alertmanager_max_alerts_count`, `alertmanager_max_alerts_size_bytes`) and added flag `-overrides-exporter.enabled-metrics` to explicitly configure desired metrics, e.g. `-overrides-exporter.enabled-metrics=request_rate,ingestion_rate`. Default value for this flag is: `ingestion_rate,ingestion_burst_size,max_global_series_per_user,max_global_series_per_metric,max_global_exemplars_per_user,max_fetched_chunks_per_query,max_fetched_series_per_query,ruler_max_rules_per_rule_group,ruler_max_rule_groups_per_tenant`. #5376
* [ENHANCEMENT] Cardinality API: When zone aware replication is enabled, the label values cardinality API can now tolerate single zone failure #5178
* [ENHANCEMENT] Distributor: optimize sending requests to ingesters when incoming requests don't need to be modified. #5137 #5389
Expand Down
12 changes: 6 additions & 6 deletions pkg/ingester/activeseries/active_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ func TestPostings_Expand(t *testing.T) {

// Update each series at a different time according to its index.
for i := range allStorageRefs {
activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0))
activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0), -1)
}

valid := activeSeries.Purge(mockedTime)
allActive, _ := activeSeries.ActiveWithMatchers()
allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers()
require.True(t, valid)
require.Equal(t, 2, allActive)

Expand Down Expand Up @@ -60,11 +60,11 @@ func TestPostings_Seek(t *testing.T) {

// Update each series at a different time according to its index.
for i := range allStorageRefs {
activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0))
activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0), -1)
}

valid := activeSeries.Purge(mockedTime)
allActive, _ := activeSeries.ActiveWithMatchers()
allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers()
require.True(t, valid)
require.Equal(t, 2, allActive)

Expand Down Expand Up @@ -92,11 +92,11 @@ func TestPostings_SeekToEnd(t *testing.T) {

// Update each series at a different time according to its index.
for i := range allStorageRefs {
activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0))
activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0), -1)
}

valid := activeSeries.Purge(mockedTime)
allActive, _ := activeSeries.ActiveWithMatchers()
allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers()
require.True(t, valid)
require.Equal(t, 0, allActive)

Expand Down
161 changes: 125 additions & 36 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,21 @@ type seriesStripe struct {
// without holding the lock -- hence the atomic).
oldestEntryTs atomic.Int64

mu sync.RWMutex
refs map[uint64]seriesEntry
active int // Number of active entries in this stripe. Only decreased during purge or clear.
activeMatching []int // Number of active entries in this stripe matching each matcher of the configured Matchers.
mu sync.RWMutex
refs map[uint64]seriesEntry
active int // Number of active entries in this stripe. Only decreased during purge or clear.
activeMatching []int // Number of active entries in this stripe matching each matcher of the configured Matchers.
activeNativeHistograms int // Number of active entries (only native histograms) in this stripe. Only decreased during purge or clear.
activeMatchingNativeHistograms []int // Number of active entries (only native histograms) in this stripe matching each matcher of the configured Matchers.
activeNativeHistogramBuckets int // Number of buckets in active native histogram entries in this stripe. Only decreased during purge or clear.
activeMatchingNativeHistogramBuckets []int // Number of buckets in active native histogram entries in this stripe matching each matcher of the configured Matchers.
}

// seriesEntry holds a timestamp for single series.
type seriesEntry struct {
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
matches preAllocDynamicSlice // Index of the matcher matching
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
matches preAllocDynamicSlice // Index of the matcher matching
numNativeHistogramBuckets int // Number of buckets in native histogram series, -1 if not a native histogram.
}

func NewActiveSeries(asm *Matchers, timeout time.Duration) *ActiveSeries {
Expand Down Expand Up @@ -104,10 +109,11 @@ func (c *ActiveSeries) CurrentConfig() CustomTrackersConfig {
}

// UpdateSeries updates series timestamp to 'now'. Function is called to make a copy of labels if entry doesn't exist yet.
func (c *ActiveSeries) UpdateSeries(series labels.Labels, ref uint64, now time.Time) {
// Pass -1 in numNativeHistogramBuckets if the series is not a native histogram series.
func (c *ActiveSeries) UpdateSeries(series labels.Labels, ref uint64, now time.Time, numNativeHistogramBuckets int) {
stripeID := ref % numStripes

c.stripes[stripeID].updateSeriesTimestamp(now, series, ref)
c.stripes[stripeID].updateSeriesTimestamp(now, series, ref, numNativeHistogramBuckets)
}

// Purge purges expired entries and returns true if enough time has passed since
Expand All @@ -134,31 +140,41 @@ func (c *ActiveSeries) ContainsRef(ref uint64) bool {
return c.stripes[stripeID].containsRef(ref)
}

// Active returns the total number of active series. This method does not purge
// expired entries, so Purge should be called periodically.
func (c *ActiveSeries) Active() int {
total := 0
// Active returns the total numbers of active series, active native
// histogram series, and buckets of those native histogram series.
// This method does not purge expired entries, so Purge should be
// called periodically.
func (c *ActiveSeries) Active() (total, totalNativeHistograms, totalNativeHistogramBuckets int) {
for s := 0; s < numStripes; s++ {
total += c.stripes[s].getTotal()
all, histograms, buckets := c.stripes[s].getTotal()
total += all
totalNativeHistograms += histograms
totalNativeHistogramBuckets += buckets
}
return total
return
}

// ActiveWithMatchers returns the total number of active series, as well as a
// slice of active series matching each one of the custom trackers provided (in
// the same order as custom trackers are defined). This method does not purge
// the same order as custom trackers are defined), and then the same thing for
// only active series that are native histograms, then the same for the number
// of buckets in those active native histogram series. This method does not purge
// expired entries, so Purge should be called periodically.
func (c *ActiveSeries) ActiveWithMatchers() (int, []int) {
func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, totalNativeHistograms int, totalMatchingNativeHistograms []int, totalNativeHistogramBuckets int, totalMatchingNativeHistogramBuckets []int) {
c.matchersMutex.RLock()
defer c.matchersMutex.RUnlock()

total := 0
totalMatching := resizeAndClear(len(c.matchers.MatcherNames()), nil)
totalMatching = resizeAndClear(len(c.matchers.MatcherNames()), nil)
totalMatchingNativeHistograms = resizeAndClear(len(c.matchers.MatcherNames()), nil)
totalMatchingNativeHistogramBuckets = resizeAndClear(len(c.matchers.MatcherNames()), nil)
for s := 0; s < numStripes; s++ {
total += c.stripes[s].getTotalAndUpdateMatching(totalMatching)
all, histograms, buckets := c.stripes[s].getTotalAndUpdateMatching(totalMatching, totalMatchingNativeHistograms, totalMatchingNativeHistogramBuckets)
total += all
totalNativeHistograms += histograms
totalNativeHistogramBuckets += buckets
}

return total, totalMatching
return
}

func (s *seriesStripe) containsRef(ref uint64) bool {
Expand All @@ -170,33 +186,42 @@ func (s *seriesStripe) containsRef(ref uint64) bool {
}

// getTotal will return the total active series in the stripe
func (s *seriesStripe) getTotal() int {
// and the total active series that are native histograms
// and the total number of buckets in active native histogram series
func (s *seriesStripe) getTotal() (int, int, int) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.active
return s.active, s.activeNativeHistograms, s.activeNativeHistogramBuckets
}

// getTotalAndUpdateMatching will return the total active series in the stripe and also update the slice provided
// with each matcher's total.
func (s *seriesStripe) getTotalAndUpdateMatching(matching []int) int {
// with each matcher's total, and will also do the same for total active series that are native histograms
// as well as the total number of buckets in active native histogram series
func (s *seriesStripe) getTotalAndUpdateMatching(matching []int, matchingNativeHistograms []int, matchingNativeHistogramBuckets []int) (int, int, int) {
s.mu.RLock()
defer s.mu.RUnlock()

// len(matching) == len(s.activeMatching) by design, and it could be nil
for i, a := range s.activeMatching {
matching[i] += a
}
for i, a := range s.activeMatchingNativeHistograms {
matchingNativeHistograms[i] += a
}
for i, a := range s.activeMatchingNativeHistogramBuckets {
matchingNativeHistogramBuckets[i] += a
}

return s.active
return s.active, s.activeNativeHistograms, s.activeNativeHistogramBuckets
}

func (s *seriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels, ref uint64) {
func (s *seriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels, ref uint64, numNativeHistogramBuckets int) {
nowNanos := now.UnixNano()

e := s.findEntryForSeries(ref)
e, needsUpdating := s.findEntryForSeries(ref, numNativeHistogramBuckets)
entryTimeSet := false
if e == nil {
e, entryTimeSet = s.findOrCreateEntryForSeries(ref, series, nowNanos)
if e == nil || needsUpdating {
e, entryTimeSet = s.findAndUpdateOrCreateEntryForSeries(ref, series, nowNanos, numNativeHistogramBuckets)
}

if !entryTimeSet {
Expand All @@ -216,34 +241,77 @@ func (s *seriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels
}
}

func (s *seriesStripe) findEntryForSeries(ref uint64) *atomic.Int64 {
func (s *seriesStripe) findEntryForSeries(ref uint64, numNativeHistogramBuckets int) (*atomic.Int64, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.refs[ref].nanos
entry := s.refs[ref]
return entry.nanos, entry.numNativeHistogramBuckets != numNativeHistogramBuckets
}

func (s *seriesStripe) findOrCreateEntryForSeries(ref uint64, series labels.Labels, nowNanos int64) (*atomic.Int64, bool) {
func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref uint64, series labels.Labels, nowNanos int64, numNativeHistogramBuckets int) (*atomic.Int64, bool) {
s.mu.Lock()
defer s.mu.Unlock()

// Check if already exists within the entries.
// This repeats findEntryForSeries(), but under write lock.
entry, ok := s.refs[ref]
if ok {
if entry.numNativeHistogramBuckets != numNativeHistogramBuckets {
matches := s.matchers.matches(series)
matchesLen := matches.len()
if numNativeHistogramBuckets >= 0 && entry.numNativeHistogramBuckets >= 0 {
// change number of buckets but still a histogram
diff := numNativeHistogramBuckets - entry.numNativeHistogramBuckets
s.activeNativeHistogramBuckets += diff
for i := 0; i < matchesLen; i++ {
s.activeMatchingNativeHistogramBuckets[matches.get(i)] += diff
}
} else if numNativeHistogramBuckets >= 0 {
// change from float to histogram
s.activeNativeHistograms++
s.activeNativeHistogramBuckets += numNativeHistogramBuckets
for i := 0; i < matchesLen; i++ {
match := matches.get(i)
s.activeMatchingNativeHistograms[match]++
s.activeMatchingNativeHistogramBuckets[match] += numNativeHistogramBuckets
}
} else {
// change from histogram to float
s.activeNativeHistograms--
s.activeNativeHistogramBuckets -= entry.numNativeHistogramBuckets
for i := 0; i < matchesLen; i++ {
match := matches.get(i)
s.activeMatchingNativeHistograms[match]--
s.activeMatchingNativeHistogramBuckets[match] -= entry.numNativeHistogramBuckets
}
}
entry.numNativeHistogramBuckets = numNativeHistogramBuckets
s.refs[ref] = entry
}
return entry.nanos, false
}

matches := s.matchers.matches(series)
matchesLen := matches.len()

s.active++
if numNativeHistogramBuckets >= 0 {
s.activeNativeHistograms++
s.activeNativeHistogramBuckets += numNativeHistogramBuckets
}
for i := 0; i < matchesLen; i++ {
s.activeMatching[matches.get(i)]++
match := matches.get(i)
s.activeMatching[match]++
if numNativeHistogramBuckets >= 0 {
s.activeMatchingNativeHistograms[match]++
s.activeMatchingNativeHistogramBuckets[match] += numNativeHistogramBuckets
}
}

e := seriesEntry{
nanos: atomic.NewInt64(nowNanos),
matches: matches,
nanos: atomic.NewInt64(nowNanos),
matches: matches,
numNativeHistogramBuckets: numNativeHistogramBuckets,
}

s.refs[ref] = e
Expand All @@ -259,8 +327,12 @@ func (s *seriesStripe) clear() {
s.oldestEntryTs.Store(0)
s.refs = map[uint64]seriesEntry{}
s.active = 0
s.activeNativeHistograms = 0
s.activeNativeHistogramBuckets = 0
for i := range s.activeMatching {
s.activeMatching[i] = 0
s.activeMatchingNativeHistograms[i] = 0
s.activeMatchingNativeHistogramBuckets[i] = 0
}
}

Expand All @@ -272,8 +344,12 @@ func (s *seriesStripe) reinitialize(asm *Matchers) {
s.oldestEntryTs.Store(0)
s.refs = map[uint64]seriesEntry{}
s.active = 0
s.activeNativeHistograms = 0
s.activeNativeHistogramBuckets = 0
s.matchers = asm
s.activeMatching = resizeAndClear(len(asm.MatcherNames()), s.activeMatching)
s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms)
s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets)
}

func (s *seriesStripe) purge(keepUntil time.Time) {
Expand All @@ -287,7 +363,11 @@ func (s *seriesStripe) purge(keepUntil time.Time) {
defer s.mu.Unlock()

s.active = 0
s.activeNativeHistograms = 0
s.activeNativeHistogramBuckets = 0
s.activeMatching = resizeAndClear(len(s.activeMatching), s.activeMatching)
s.activeMatchingNativeHistograms = resizeAndClear(len(s.activeMatchingNativeHistograms), s.activeMatchingNativeHistograms)
s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(s.activeMatchingNativeHistogramBuckets), s.activeMatchingNativeHistogramBuckets)

oldest := int64(math.MaxInt64)
for ref, entry := range s.refs {
Expand All @@ -298,9 +378,18 @@ func (s *seriesStripe) purge(keepUntil time.Time) {
}

s.active++
if entry.numNativeHistogramBuckets >= 0 {
s.activeNativeHistograms++
s.activeNativeHistogramBuckets += entry.numNativeHistogramBuckets
}
ml := entry.matches.len()
for i := 0; i < ml; i++ {
s.activeMatching[entry.matches.get(i)]++
match := entry.matches.get(i)
s.activeMatching[match]++
if entry.numNativeHistogramBuckets >= 0 {
s.activeMatchingNativeHistograms[match]++
s.activeMatchingNativeHistogramBuckets[match] += entry.numNativeHistogramBuckets
}
}
if ts < oldest {
oldest = ts
Expand Down
Loading

0 comments on commit a9f4bdf

Please sign in to comment.