Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Differentiate retry and halt error and retry failed compaction only on retriable error #6111

Merged
merged 6 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* [ENHANCEMENT] Distributor: Reduce memory usage when error volume is high. #6095
* [ENHANCEMENT] Compactor: Centralize metrics used by compactor and add user label to compactor metrics. #6096
* [ENHANCEMENT] Compactor: Add unique execution ID for each compaction cycle in log for easy debugging. #6097
* [ENHANCEMENT] Compactor: Differentiate retry and halt error and retry failed compaction only on retriable error. #6111
* [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
Expand Down
11 changes: 10 additions & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,10 +792,19 @@ func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) e
if lastErr == nil {
return nil
}
if ctx.Err() != nil {
return ctx.Err()
}
if c.isCausedByPermissionDenied(lastErr) {
level.Warn(c.logger).Log("msg", "skipping compactUser due to PermissionDenied", "user", userID, "err", lastErr)
level.Warn(c.logger).Log("msg", "skipping compactUser due to PermissionDenied", "org_id", userID, "err", lastErr)
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
if compact.IsHaltError(lastErr) {
level.Error(c.logger).Log("msg", "compactor returned critical error", "org_id", userID, "err", lastErr)
c.compactorMetrics.compactionHaltErrors.WithLabelValues(userID).Inc()
return lastErr
}
c.compactorMetrics.compactionRetryErrors.WithLabelValues(userID).Inc()

retries.Wait()
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/compactor/compactor_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type compactorMetrics struct {
compactionFailures *prometheus.CounterVec
verticalCompactions *prometheus.CounterVec
remainingPlannedCompactions *prometheus.GaugeVec
compactionRetryErrors *prometheus.CounterVec
compactionHaltErrors *prometheus.CounterVec
}

const (
Expand Down Expand Up @@ -159,6 +161,14 @@ func newCompactorMetricsWithLabels(reg prometheus.Registerer, commonLabels []str
Name: "cortex_compactor_remaining_planned_compactions",
Help: "Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy",
}, commonLabels)
m.compactionRetryErrors = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
danielblando marked this conversation as resolved.
Show resolved Hide resolved
Name: "cortex_compactor_compaction_retry_error_total",
Help: "Total number of retry errors from compactions.",
}, commonLabels)
m.compactionHaltErrors = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_compactor_compaction_halt_error_total",
Help: "Total number of halt errors from compactions.",
}, commonLabels)

return &m
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/compactor/compactor_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ func TestSyncerMetrics(t *testing.T) {
cortex_compactor_remaining_planned_compactions{user="aaa"} 377740
cortex_compactor_remaining_planned_compactions{user="bbb"} 388850
cortex_compactor_remaining_planned_compactions{user="ccc"} 399960
# HELP cortex_compactor_compaction_halt_error_total Total number of halt errors from compactions.
# TYPE cortex_compactor_compaction_halt_error_total counter
cortex_compactor_compaction_halt_error_total{user="aaa"} 444400
cortex_compactor_compaction_halt_error_total{user="bbb"} 455510
cortex_compactor_compaction_halt_error_total{user="ccc"} 466620
# HELP cortex_compactor_compaction_retry_error_total Total number of retry errors from compactions.
# TYPE cortex_compactor_compaction_retry_error_total counter
cortex_compactor_compaction_retry_error_total{user="aaa"} 411070
cortex_compactor_compaction_retry_error_total{user="bbb"} 422180
cortex_compactor_compaction_retry_error_total{user="ccc"} 433290
`))
require.NoError(t, err)

Expand Down Expand Up @@ -163,4 +173,10 @@ func generateTestData(cm *compactorMetrics, base float64) {
cm.remainingPlannedCompactions.WithLabelValues("aaa").Add(34 * base)
cm.remainingPlannedCompactions.WithLabelValues("bbb").Add(35 * base)
cm.remainingPlannedCompactions.WithLabelValues("ccc").Add(36 * base)
cm.compactionRetryErrors.WithLabelValues("aaa").Add(37 * base)
cm.compactionRetryErrors.WithLabelValues("bbb").Add(38 * base)
cm.compactionRetryErrors.WithLabelValues("ccc").Add(39 * base)
cm.compactionHaltErrors.WithLabelValues("aaa").Add(40 * base)
cm.compactionHaltErrors.WithLabelValues("bbb").Add(41 * base)
cm.compactionHaltErrors.WithLabelValues("ccc").Add(42 * base)
}
110 changes: 108 additions & 2 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1646,8 +1646,8 @@ func (m *tsdbPlannerMock) getNoCompactBlocks() []string {
return result
}

func mockBlockMetaJSON(id string) string {
meta := tsdb.BlockMeta{
func mockBlockMeta(id string) tsdb.BlockMeta {
return tsdb.BlockMeta{
Version: 1,
ULID: ulid.MustParse(id),
MinTime: 1574776800000,
Expand All @@ -1657,6 +1657,10 @@ func mockBlockMetaJSON(id string) string {
Sources: []ulid.ULID{ulid.MustParse(id)},
},
}
}

func mockBlockMetaJSON(id string) string {
meta := mockBlockMeta(id)

content, err := json.Marshal(meta)
if err != nil {
Expand Down Expand Up @@ -2019,3 +2023,105 @@ func TestCompactor_ShouldNotFailCompactionIfAccessDeniedErrReturnedFromBucket(t

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
}

func TestCompactor_FailedWithRetriableError(t *testing.T) {
t.Parallel()

ss := bucketindex.Status{Status: bucketindex.Ok, Version: bucketindex.SyncStatusFileVersion}
content, err := json.Marshal(ss)
require.NoError(t, err)

bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockIter("", []string{"user-1"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", nil, errors.New("test retriable error"))
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
bucketClient.MockIter("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil, errors.New("test retriable error"))
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil)
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", string(content), nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)

cfg := prepareConfig()
cfg.CompactionRetries = 2

c, _, tsdbPlanner, _, registry := prepare(t, cfg, bucketClient, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{{BlockMeta: mockBlockMeta("01DTVP434PA9VFXSW2JKB3392D")}, {BlockMeta: mockBlockMeta("01DTW0ZCPDDNV4BV83Q2SV4QAZ")}}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

cortex_testutil.Poll(t, 1*time.Second, 2.0, func() interface{} {
return prom_testutil.ToFloat64(c.compactorMetrics.compactionRetryErrors.WithLabelValues("user-1"))
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))

assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(`
# HELP cortex_compactor_compaction_retry_error_total Total number of retry errors from compactions.
# TYPE cortex_compactor_compaction_retry_error_total counter
cortex_compactor_compaction_retry_error_total{user="user-1"} 2
`),
"cortex_compactor_compaction_retry_error_total",
"cortex_compactor_compaction_halt_error_total",
))
}

func TestCompactor_FailedWithHaltError(t *testing.T) {
t.Parallel()

ss := bucketindex.Status{Status: bucketindex.Ok, Version: bucketindex.SyncStatusFileVersion}
content, err := json.Marshal(ss)
require.NoError(t, err)

bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockIter("", []string{"user-1"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", nil, compact.HaltError{})
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
bucketClient.MockIter("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil, compact.HaltError{})
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil)
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", string(content), nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)

cfg := prepareConfig()
cfg.CompactionRetries = 2

c, _, tsdbPlanner, _, registry := prepare(t, cfg, bucketClient, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{{BlockMeta: mockBlockMeta("01DTVP434PA9VFXSW2JKB3392D")}, {BlockMeta: mockBlockMeta("01DTW0ZCPDDNV4BV83Q2SV4QAZ")}}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

cortex_testutil.Poll(t, 1*time.Second, 1.0, func() interface{} {
return prom_testutil.ToFloat64(c.compactorMetrics.compactionHaltErrors.WithLabelValues("user-1"))
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))

assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(`
# HELP cortex_compactor_compaction_halt_error_total Total number of halt errors from compactions.
# TYPE cortex_compactor_compaction_halt_error_total counter
cortex_compactor_compaction_halt_error_total{user="user-1"} 1
`),
"cortex_compactor_compaction_retry_error_total",
"cortex_compactor_compaction_halt_error_total",
))
}
Loading