Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add histogram billing metrics to ingester #5318

Merged
merged 9 commits into from
Jul 11, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [FEATURE] Ruler: Support filtering results from rule status endpoint by `file`, `rule_group` and `rule_name`. #5291
* [FEATURE] Ingester: add experimental support for creating tokens by using spread minimizing strategy. This can be enabled with `-ingester.ring.token-generation-strategy: spread-minimizing` and `-ingester.ring.spread-minimizing-zones: <all available zones>`. In that case `-ingester.ring.tokens-file-path` must be empty. #5308 #5324
* [FEATURE] Ingester: add experimental support to compact the TSDB Head when the number of in-memory series is equal or greater than `-blocks-storage.tsdb.early-head-compaction-min-in-memory-series`, and the ingester estimates that the per-tenant TSDB Head compaction will reduce in-memory series by at least `-blocks-storage.tsdb.early-head-compaction-min-estimated-series-reduction-percentage`. #5371
* [FEATURE] Ingester: add new metrics for tracking native histograms in active series: `cortex_ingester_active_native_histogram_series`, `cortex_ingester_active_native_histogram_series_custom_tracker`, `cortex_ingester_active_native_histogram_buckets`, `cortex_ingester_active_native_histogram_buckets_custom_tracker`. The first 2 are the subsets of the existing and unmodified `cortex_ingester_active_series` and `cortex_ingester_active_series_custom_tracker` respectively, only tracking native histogram series, and the last 2 are the equivalents for tracking the number of buckets in native histogram series. #5318
* [ENHANCEMENT] Overrides-exporter: Add new metrics for write path and alertmanager (`max_global_metadata_per_user`, `max_global_metadata_per_metric`, `request_rate`, `request_burst_size`, `alertmanager_notification_rate_limit`, `alertmanager_max_dispatcher_aggregation_groups`, `alertmanager_max_alerts_count`, `alertmanager_max_alerts_size_bytes`) and added flag `-overrides-exporter.enabled-metrics` to explicitly configure desired metrics, e.g. `-overrides-exporter.enabled-metrics=request_rate,ingestion_rate`. Default value for this flag is: `ingestion_rate,ingestion_burst_size,max_global_series_per_user,max_global_series_per_metric,max_global_exemplars_per_user,max_fetched_chunks_per_query,max_fetched_series_per_query,ruler_max_rules_per_rule_group,ruler_max_rule_groups_per_tenant`. #5376
* [ENHANCEMENT] Cardinality API: When zone aware replication is enabled, the label values cardinality API can now tolerate single zone failure #5178
* [ENHANCEMENT] Distributor: optimize sending requests to ingesters when incoming requests don't need to be modified. #5137 #5389
Expand Down
12 changes: 6 additions & 6 deletions pkg/ingester/activeseries/active_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ func TestPostings_Expand(t *testing.T) {

// Update each series at a different time according to its index.
for i := range allStorageRefs {
activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0))
activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0), -1)
}

valid := activeSeries.Purge(mockedTime)
allActive, _ := activeSeries.ActiveWithMatchers()
allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers()
require.True(t, valid)
require.Equal(t, 2, allActive)

Expand Down Expand Up @@ -60,11 +60,11 @@ func TestPostings_Seek(t *testing.T) {

// Update each series at a different time according to its index.
for i := range allStorageRefs {
activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0))
activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0), -1)
}

valid := activeSeries.Purge(mockedTime)
allActive, _ := activeSeries.ActiveWithMatchers()
allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers()
require.True(t, valid)
require.Equal(t, 2, allActive)

Expand Down Expand Up @@ -92,11 +92,11 @@ func TestPostings_SeekToEnd(t *testing.T) {

// Update each series at a different time according to its index.
for i := range allStorageRefs {
activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0))
activeSeries.UpdateSeries(series[i], uint64(allStorageRefs[i]), time.Unix(int64(i), 0), -1)
}

valid := activeSeries.Purge(mockedTime)
allActive, _ := activeSeries.ActiveWithMatchers()
allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers()
require.True(t, valid)
require.Equal(t, 0, allActive)

Expand Down
161 changes: 125 additions & 36 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,21 @@ type seriesStripe struct {
// without holding the lock -- hence the atomic).
oldestEntryTs atomic.Int64

mu sync.RWMutex
refs map[uint64]seriesEntry
active int // Number of active entries in this stripe. Only decreased during purge or clear.
activeMatching []int // Number of active entries in this stripe matching each matcher of the configured Matchers.
mu sync.RWMutex
refs map[uint64]seriesEntry
active int // Number of active entries in this stripe. Only decreased during purge or clear.
activeMatching []int // Number of active entries in this stripe matching each matcher of the configured Matchers.
activeNativeHistograms int // Number of active entries (only native histograms) in this stripe. Only decreased during purge or clear.
activeMatchingNativeHistograms []int // Number of active entries (only native histograms) in this stripe matching each matcher of the configured Matchers.
activeNativeHistogramBuckets int // Number of buckets in active native histogram entries in this stripe. Only decreased during purge or clear.
activeMatchingNativeHistogramBuckets []int // Number of buckets in active native histogram entries in this stripe matching each matcher of the configured Matchers.
}

// seriesEntry holds a timestamp for single series.
type seriesEntry struct {
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
matches preAllocDynamicSlice // Index of the matcher matching
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
matches preAllocDynamicSlice // Index of the matcher matching
numNativeHistogramBuckets int // Number of buckets in native histogram series, -1 if not a native histogram.
}

func NewActiveSeries(asm *Matchers, timeout time.Duration) *ActiveSeries {
Expand Down Expand Up @@ -104,10 +109,11 @@ func (c *ActiveSeries) CurrentConfig() CustomTrackersConfig {
}

// UpdateSeries updates series timestamp to 'now'. Function is called to make a copy of labels if entry doesn't exist yet.
func (c *ActiveSeries) UpdateSeries(series labels.Labels, ref uint64, now time.Time) {
// Pass -1 in numNativeHistogramBuckets if the series is not a native histogram series.
func (c *ActiveSeries) UpdateSeries(series labels.Labels, ref uint64, now time.Time, numNativeHistogramBuckets int) {
zenador marked this conversation as resolved.
Show resolved Hide resolved
stripeID := ref % numStripes

c.stripes[stripeID].updateSeriesTimestamp(now, series, ref)
c.stripes[stripeID].updateSeriesTimestamp(now, series, ref, numNativeHistogramBuckets)
}

// Purge purges expired entries and returns true if enough time has passed since
Expand All @@ -134,31 +140,41 @@ func (c *ActiveSeries) ContainsRef(ref uint64) bool {
return c.stripes[stripeID].containsRef(ref)
}

// Active returns the total number of active series. This method does not purge
// expired entries, so Purge should be called periodically.
func (c *ActiveSeries) Active() int {
total := 0
// Active returns the total numbers of active series, active native
// histogram series, and buckets of those native histogram series.
// This method does not purge expired entries, so Purge should be
// called periodically.
func (c *ActiveSeries) Active() (total, totalNativeHistograms, totalNativeHistogramBuckets int) {
for s := 0; s < numStripes; s++ {
total += c.stripes[s].getTotal()
all, histograms, buckets := c.stripes[s].getTotal()
total += all
totalNativeHistograms += histograms
totalNativeHistogramBuckets += buckets
}
return total
return
}

// ActiveWithMatchers returns the total number of active series, as well as a
// slice of active series matching each one of the custom trackers provided (in
// the same order as custom trackers are defined). This method does not purge
// the same order as custom trackers are defined), and then the same thing for
// only active series that are native histograms, then the same for the number
// of buckets in those active native histogram series. This method does not purge
// expired entries, so Purge should be called periodically.
func (c *ActiveSeries) ActiveWithMatchers() (int, []int) {
func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, totalNativeHistograms int, totalMatchingNativeHistograms []int, totalNativeHistogramBuckets int, totalMatchingNativeHistogramBuckets []int) {
c.matchersMutex.RLock()
defer c.matchersMutex.RUnlock()

total := 0
totalMatching := resizeAndClear(len(c.matchers.MatcherNames()), nil)
totalMatching = resizeAndClear(len(c.matchers.MatcherNames()), nil)
totalMatchingNativeHistograms = resizeAndClear(len(c.matchers.MatcherNames()), nil)
totalMatchingNativeHistogramBuckets = resizeAndClear(len(c.matchers.MatcherNames()), nil)
for s := 0; s < numStripes; s++ {
total += c.stripes[s].getTotalAndUpdateMatching(totalMatching)
all, histograms, buckets := c.stripes[s].getTotalAndUpdateMatching(totalMatching, totalMatchingNativeHistograms, totalMatchingNativeHistogramBuckets)
total += all
totalNativeHistograms += histograms
totalNativeHistogramBuckets += buckets
}

return total, totalMatching
return
}

func (s *seriesStripe) containsRef(ref uint64) bool {
Expand All @@ -170,33 +186,42 @@ func (s *seriesStripe) containsRef(ref uint64) bool {
}

// getTotal will return the total active series in the stripe
func (s *seriesStripe) getTotal() int {
// and the total active series that are native histograms
// and the total number of buckets in active native histogram series
func (s *seriesStripe) getTotal() (int, int, int) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.active
return s.active, s.activeNativeHistograms, s.activeNativeHistogramBuckets
}

// getTotalAndUpdateMatching will return the total active series in the stripe and also update the slice provided
// with each matcher's total.
func (s *seriesStripe) getTotalAndUpdateMatching(matching []int) int {
// with each matcher's total, and will also do the same for total active series that are native histograms
// as well as the total number of buckets in active native histogram series
func (s *seriesStripe) getTotalAndUpdateMatching(matching []int, matchingNativeHistograms []int, matchingNativeHistogramBuckets []int) (int, int, int) {
s.mu.RLock()
defer s.mu.RUnlock()

// len(matching) == len(s.activeMatching) by design, and it could be nil
for i, a := range s.activeMatching {
matching[i] += a
}
for i, a := range s.activeMatchingNativeHistograms {
matchingNativeHistograms[i] += a
}
for i, a := range s.activeMatchingNativeHistogramBuckets {
matchingNativeHistogramBuckets[i] += a
}

return s.active
return s.active, s.activeNativeHistograms, s.activeNativeHistogramBuckets
}

func (s *seriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels, ref uint64) {
func (s *seriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels, ref uint64, numNativeHistogramBuckets int) {
nowNanos := now.UnixNano()

e := s.findEntryForSeries(ref)
e, needsUpdating := s.findEntryForSeries(ref, numNativeHistogramBuckets)
entryTimeSet := false
if e == nil {
e, entryTimeSet = s.findOrCreateEntryForSeries(ref, series, nowNanos)
if e == nil || needsUpdating {
e, entryTimeSet = s.findAndUpdateOrCreateEntryForSeries(ref, series, nowNanos, numNativeHistogramBuckets)
}

if !entryTimeSet {
Expand All @@ -216,34 +241,77 @@ func (s *seriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels
}
}

func (s *seriesStripe) findEntryForSeries(ref uint64) *atomic.Int64 {
func (s *seriesStripe) findEntryForSeries(ref uint64, numNativeHistogramBuckets int) (*atomic.Int64, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.refs[ref].nanos
entry := s.refs[ref]
return entry.nanos, entry.numNativeHistogramBuckets != numNativeHistogramBuckets
}

func (s *seriesStripe) findOrCreateEntryForSeries(ref uint64, series labels.Labels, nowNanos int64) (*atomic.Int64, bool) {
func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref uint64, series labels.Labels, nowNanos int64, numNativeHistogramBuckets int) (*atomic.Int64, bool) {
s.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO note: after this PR we should check how often the bucket count changes and whether we should merge findEntryForSeries and findAndUpdateOrCreateEntryForSeries to avoid double locking and refs lookup.

defer s.mu.Unlock()

// Check if already exists within the entries.
// This repeats findEntryForSeries(), but under write lock.
entry, ok := s.refs[ref]
if ok {
if entry.numNativeHistogramBuckets != numNativeHistogramBuckets {
matches := s.matchers.matches(series)
matchesLen := matches.len()
if numNativeHistogramBuckets >= 0 && entry.numNativeHistogramBuckets >= 0 {
// change number of buckets but still a histogram
diff := numNativeHistogramBuckets - entry.numNativeHistogramBuckets
s.activeNativeHistogramBuckets += diff
for i := 0; i < matchesLen; i++ {
s.activeMatchingNativeHistogramBuckets[matches.get(i)] += diff
}
} else if numNativeHistogramBuckets >= 0 {
// change from float to histogram
s.activeNativeHistograms++
s.activeNativeHistogramBuckets += numNativeHistogramBuckets
for i := 0; i < matchesLen; i++ {
match := matches.get(i)
s.activeMatchingNativeHistograms[match]++
s.activeMatchingNativeHistogramBuckets[match] += numNativeHistogramBuckets
}
} else {
// change from histogram to float
s.activeNativeHistograms--
s.activeNativeHistogramBuckets -= entry.numNativeHistogramBuckets
for i := 0; i < matchesLen; i++ {
match := matches.get(i)
s.activeMatchingNativeHistograms[match]--
s.activeMatchingNativeHistogramBuckets[match] -= entry.numNativeHistogramBuckets
}
}
entry.numNativeHistogramBuckets = numNativeHistogramBuckets
s.refs[ref] = entry
}
return entry.nanos, false
}

matches := s.matchers.matches(series)
matchesLen := matches.len()

s.active++
if numNativeHistogramBuckets >= 0 {
s.activeNativeHistograms++
s.activeNativeHistogramBuckets += numNativeHistogramBuckets
}
for i := 0; i < matchesLen; i++ {
s.activeMatching[matches.get(i)]++
match := matches.get(i)
s.activeMatching[match]++
if numNativeHistogramBuckets >= 0 {
s.activeMatchingNativeHistograms[match]++
s.activeMatchingNativeHistogramBuckets[match] += numNativeHistogramBuckets
}
}

e := seriesEntry{
nanos: atomic.NewInt64(nowNanos),
matches: matches,
nanos: atomic.NewInt64(nowNanos),
matches: matches,
numNativeHistogramBuckets: numNativeHistogramBuckets,
}

s.refs[ref] = e
Expand All @@ -259,8 +327,12 @@ func (s *seriesStripe) clear() {
s.oldestEntryTs.Store(0)
s.refs = map[uint64]seriesEntry{}
s.active = 0
s.activeNativeHistograms = 0
s.activeNativeHistogramBuckets = 0
for i := range s.activeMatching {
s.activeMatching[i] = 0
s.activeMatchingNativeHistograms[i] = 0
s.activeMatchingNativeHistogramBuckets[i] = 0
}
}

Expand All @@ -272,8 +344,12 @@ func (s *seriesStripe) reinitialize(asm *Matchers) {
s.oldestEntryTs.Store(0)
s.refs = map[uint64]seriesEntry{}
s.active = 0
s.activeNativeHistograms = 0
s.activeNativeHistogramBuckets = 0
s.matchers = asm
s.activeMatching = resizeAndClear(len(asm.MatcherNames()), s.activeMatching)
s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms)
s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets)
}

func (s *seriesStripe) purge(keepUntil time.Time) {
Expand All @@ -287,7 +363,11 @@ func (s *seriesStripe) purge(keepUntil time.Time) {
defer s.mu.Unlock()

s.active = 0
s.activeNativeHistograms = 0
s.activeNativeHistogramBuckets = 0
s.activeMatching = resizeAndClear(len(s.activeMatching), s.activeMatching)
s.activeMatchingNativeHistograms = resizeAndClear(len(s.activeMatchingNativeHistograms), s.activeMatchingNativeHistograms)
s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(s.activeMatchingNativeHistogramBuckets), s.activeMatchingNativeHistogramBuckets)

oldest := int64(math.MaxInt64)
for ref, entry := range s.refs {
Expand All @@ -298,9 +378,18 @@ func (s *seriesStripe) purge(keepUntil time.Time) {
}

s.active++
if entry.numNativeHistogramBuckets >= 0 {
s.activeNativeHistograms++
s.activeNativeHistogramBuckets += entry.numNativeHistogramBuckets
}
ml := entry.matches.len()
for i := 0; i < ml; i++ {
s.activeMatching[entry.matches.get(i)]++
match := entry.matches.get(i)
s.activeMatching[match]++
if entry.numNativeHistogramBuckets >= 0 {
s.activeMatchingNativeHistograms[match]++
s.activeMatchingNativeHistogramBuckets[match] += entry.numNativeHistogramBuckets
}
}
if ts < oldest {
oldest = ts
Expand Down
Loading