From 2dd361a40620aeb16608427e0634a51700b28833 Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 7 May 2024 11:05:21 -0700 Subject: [PATCH 1/2] Cleaning up stale ingester metrics Signed-off-by: alanprot --- pkg/distributor/distributor.go | 43 ++++++++++++++++++++ pkg/distributor/distributor_test.go | 62 ++++++++++++++++++++++++++++- 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index dfaa12d89c..4475b3c1a2 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -65,6 +65,8 @@ const ( instanceIngestionRateTickInterval = time.Second + clearStaleIngesterMetricsInterval = time.Minute + // mergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and // it was based on empirical observation: See BenchmarkMergeSlicesParallel mergeSlicesParallelism = 8 @@ -398,6 +400,9 @@ func (d *Distributor) running(ctx context.Context) error { ingestionRateTicker := time.NewTicker(instanceIngestionRateTickInterval) defer ingestionRateTicker.Stop() + staleIngesterMetricTicker := time.NewTicker(clearStaleIngesterMetricsInterval) + defer staleIngesterMetricTicker.Stop() + for { select { case <-ctx.Done(): @@ -406,6 +411,9 @@ func (d *Distributor) running(ctx context.Context) error { case <-ingestionRateTicker.C: d.ingestionRate.Tick() + case <-staleIngesterMetricTicker.C: + d.cleanStaleIngesterMetrics() + case err := <-d.subservicesWatcher.Chan(): return errors.Wrap(err, "distributor subservice failed") } @@ -701,6 +709,41 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co return &cortexpb.WriteResponse{}, firstPartialErr } +func (d *Distributor) cleanStaleIngesterMetrics() { + healthy, unhealthy, err := d.ingestersRing.GetAllInstanceDescs(ring.WriteNoExtend) + if err != nil { + level.Warn(d.log).Log("msg", "error cleaning metrics: GetAllInstanceDescs", "err", err) + return + } + + ipsMap := map[string]struct{}{} + + for _, ing := range append(healthy, unhealthy...) { + ipsMap[ing.Addr] = struct{}{} + } + + ingesterMetrics := []*prometheus.CounterVec{d.ingesterAppends, d.ingesterAppendFailures, d.ingesterQueries, d.ingesterQueryFailures} + + for _, m := range ingesterMetrics { + metrics, err := util.GetLabels(m, make(map[string]string)) + + if err != nil { + level.Warn(d.log).Log("msg", "error cleaning metrics: GetLabels", "err", err) + return + } + + for _, lbls := range metrics { + if _, ok := ipsMap[lbls.Get("ingester")]; !ok { + err := util.DeleteMatchingLabels(m, map[string]string{"ingester": lbls.Get("ingester")}) + if err != nil { + level.Warn(d.log).Log("msg", "error cleaning metrics: DeleteMatchingLabels", "err", err) + return + } + } + } + } +} + func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, subRing ring.ReadRing, keys []uint32, initialMetadataIndex int, validatedMetadata []*cortexpb.MetricMetadata, validatedTimeseries []cortexpb.PreallocTimeseries, userID string) error { span, _ := opentracing.StartSpanFromContext(ctx, "doBatch") defer span.Finish() diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index bc729a93b1..135a5a9908 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -318,8 +318,10 @@ func TestDistributor_Push(t *testing.T) { func TestDistributor_MetricsCleanup(t *testing.T) { t.Parallel() - dists, _, regs, _ := prepare(t, prepConfig{ + dists, _, regs, r := prepare(t, prepConfig{ numDistributors: 1, + numIngesters: 2, + happyIngesters: 2, }) d := dists[0] reg := regs[0] @@ -334,6 +336,10 @@ func TestDistributor_MetricsCleanup(t *testing.T) { "cortex_distributor_metadata_in_total", "cortex_distributor_non_ha_samples_received_total", "cortex_distributor_latest_seen_sample_timestamp_seconds", + "cortex_distributor_ingester_append_failures_total", + "cortex_distributor_ingester_appends_total", + "cortex_distributor_ingester_query_failures_total", + "cortex_distributor_ingester_queries_total", } d.receivedSamples.WithLabelValues("userA").Add(5) @@ -349,6 +355,16 @@ func TestDistributor_MetricsCleanup(t *testing.T) { d.dedupedSamples.WithLabelValues("userA", "cluster1").Inc() // We cannot clean this metric d.latestSeenSampleTimestampPerUser.WithLabelValues("userA").Set(1111) + h, _, _ := r.GetAllInstanceDescs(ring.WriteNoExtend) + d.ingesterAppends.WithLabelValues(h[0].Addr, typeMetadata).Inc() + d.ingesterAppendFailures.WithLabelValues(h[0].Addr, typeMetadata, "2xx").Inc() + d.ingesterAppends.WithLabelValues(h[1].Addr, typeMetadata).Inc() + d.ingesterAppendFailures.WithLabelValues(h[1].Addr, typeMetadata, "2xx").Inc() + d.ingesterQueries.WithLabelValues(h[0].Addr).Inc() + d.ingesterQueries.WithLabelValues(h[1].Addr).Inc() + d.ingesterQueryFailures.WithLabelValues(h[0].Addr).Inc() + d.ingesterQueryFailures.WithLabelValues(h[1].Addr).Inc() + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_distributor_deduped_samples_total The total number of deduplicated samples. # TYPE cortex_distributor_deduped_samples_total counter @@ -388,10 +404,41 @@ func TestDistributor_MetricsCleanup(t *testing.T) { # HELP cortex_distributor_exemplars_in_total The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars. # TYPE cortex_distributor_exemplars_in_total counter cortex_distributor_exemplars_in_total{user="userA"} 5 + + # HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters. + # TYPE cortex_distributor_ingester_append_failures_total counter + cortex_distributor_ingester_append_failures_total{ingester="0",status="2xx",type="metadata"} 1 + cortex_distributor_ingester_append_failures_total{ingester="1",status="2xx",type="metadata"} 1 + # HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters. + # TYPE cortex_distributor_ingester_appends_total counter + cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1 + cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1 + # HELP cortex_distributor_ingester_queries_total The total number of queries sent to ingesters. + # TYPE cortex_distributor_ingester_queries_total counter + cortex_distributor_ingester_queries_total{ingester="0"} 1 + cortex_distributor_ingester_queries_total{ingester="1"} 1 + # HELP cortex_distributor_ingester_query_failures_total The total number of failed queries sent to ingesters. + # TYPE cortex_distributor_ingester_query_failures_total counter + cortex_distributor_ingester_query_failures_total{ingester="0"} 1 + cortex_distributor_ingester_query_failures_total{ingester="1"} 1 `), metrics...)) d.cleanupInactiveUser("userA") + err := r.KVClient.CAS(context.Background(), ingester.RingKey, func(in interface{}) (interface{}, bool, error) { + r := in.(*ring.Desc) + delete(r.Ingesters, "0") + return in, true, nil + }) + + test.Poll(t, time.Second, true, func() interface{} { + ings, _, _ := r.GetAllInstanceDescs(ring.Write) + return len(ings) == 1 + }) + + require.NoError(t, err) + d.cleanStaleIngesterMetrics() + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_distributor_deduped_samples_total The total number of deduplicated samples. # TYPE cortex_distributor_deduped_samples_total counter @@ -422,6 +469,19 @@ func TestDistributor_MetricsCleanup(t *testing.T) { # HELP cortex_distributor_exemplars_in_total The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars. # TYPE cortex_distributor_exemplars_in_total counter + + # HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters. + # TYPE cortex_distributor_ingester_append_failures_total counter + cortex_distributor_ingester_append_failures_total{ingester="1",status="2xx",type="metadata"} 1 + # HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters. + # TYPE cortex_distributor_ingester_appends_total counter + cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1 + # HELP cortex_distributor_ingester_queries_total The total number of queries sent to ingesters. + # TYPE cortex_distributor_ingester_queries_total counter + cortex_distributor_ingester_queries_total{ingester="1"} 1 + # HELP cortex_distributor_ingester_query_failures_total The total number of failed queries sent to ingesters. + # TYPE cortex_distributor_ingester_query_failures_total counter + cortex_distributor_ingester_query_failures_total{ingester="1"} 1 `), metrics...)) } From 5d2a169c0c9a5619b8bc49cf54eb7c0c08356a32 Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 7 May 2024 11:10:06 -0700 Subject: [PATCH 2/2] changelog Signed-off-by: alanprot --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0f97c1de3..934c246c98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916 * [ENHANCEMENT] Ingester: Allowing to configure `-blocks-storage.tsdb.head-compaction-interval` flag up to 30 min and add a jitter on the first head compaction. #5919 * [ENHANCEMENT] Distributor: Added `max_inflight_push_requests` config to ingester client to protect distributor from OOMKilled. #5917 +* [ENHANCEMENT] Distributor/Querier: Clean stale per-ingester metrics after ingester restarts. #5930 * [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906 * [BUGFIX] Configsdb: Fix endline issue in db password. #5920