Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Centralize metrics used by compactor and add user label to compactor metrics #6096

Merged
merged 6 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040
* [ENHANCEMENT] Ruler: Add support for filtering by `match` field on Rules API. #6083
* [ENHANCEMENT] Distributor: Reduce memory usage when error volume is high. #6095
* [ENHANCEMENT] Compactor: Centralize metrics used by compactor and add user label to compactor metrics. #6096
* [ENHANCEMENT] Compactor: Add unique execution ID for each compaction cycle in log for easy debugging. #6097
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
Expand Down
25 changes: 15 additions & 10 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

const (
defaultDeleteBlocksConcurrency = 16
reasonValueRetention = "retention"
)

type BlocksCleanerConfig struct {
Expand Down Expand Up @@ -56,15 +57,23 @@ type BlocksCleaner struct {
runsLastSuccess prometheus.Gauge
blocksCleanedTotal prometheus.Counter
blocksFailedTotal prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForDeletion *prometheus.CounterVec
tenantBlocks *prometheus.GaugeVec
tenantBlocksMarkedForDelete *prometheus.GaugeVec
tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec
tenantPartialBlocks *prometheus.GaugeVec
tenantBucketIndexLastUpdate *prometheus.GaugeVec
}

func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.InstrumentedBucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner {
func NewBlocksCleaner(
cfg BlocksCleanerConfig,
bucketClient objstore.InstrumentedBucket,
usersScanner *cortex_tsdb.UsersScanner,
cfgProvider ConfigProvider,
logger log.Logger,
reg prometheus.Registerer,
blocksMarkedForDeletion *prometheus.CounterVec,
) *BlocksCleaner {
c := &BlocksCleaner{
cfg: cfg,
bucketClient: bucketClient,
Expand Down Expand Up @@ -95,11 +104,7 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Instrumente
Name: "cortex_compactor_block_cleanup_failures_total",
Help: "Total number of blocks failed to be deleted.",
}),
blocksMarkedForDeletion: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
ConstLabels: prometheus.Labels{"reason": "retention"},
}),
blocksMarkedForDeletion: blocksMarkedForDeletion,

// The following metrics don't have the "cortex_compactor" prefix because not strictly related to
// the compactor. They're just tracked by the compactor because it's the most logical place where these
Expand Down Expand Up @@ -374,7 +379,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
// We do not want to stop the remaining work in the cleaner if an
// error occurs here. Errors are logged in the function.
retention := c.cfgProvider.CompactorBlocksRetentionPeriod(userID)
c.applyUserRetentionPeriod(ctx, idx, retention, userBucket, userLogger)
c.applyUserRetentionPeriod(ctx, idx, retention, userBucket, userLogger, userID)
}

// Generate an updated in-memory version of the bucket index.
Expand Down Expand Up @@ -498,7 +503,7 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map
}

// applyUserRetentionPeriod marks blocks for deletion which have aged past the retention period.
func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucketindex.Index, retention time.Duration, userBucket objstore.Bucket, userLogger log.Logger) {
func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucketindex.Index, retention time.Duration, userBucket objstore.Bucket, userLogger log.Logger, userID string) {
// The retention period of zero is a special value indicating to never delete.
if retention <= 0 {
return
Expand All @@ -511,7 +516,7 @@ func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucke
// the cleaner will retry applying the retention in its next cycle.
for _, b := range blocks {
level.Info(userLogger).Log("msg", "applied retention: marking block for deletion", "block", b.ID, "maxTime", b.MaxTime)
if err := block.MarkForDeletion(ctx, userLogger, userBucket, b.ID, fmt.Sprintf("block exceeding retention of %v", retention), c.blocksMarkedForDeletion); err != nil {
if err := block.MarkForDeletion(ctx, userLogger, userBucket, b.ID, fmt.Sprintf("block exceeding retention of %v", retention), c.blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil {
level.Warn(userLogger).Log("msg", "failed to mark block for deletion", "block", b.ID, "err", err)
}
}
Expand Down
47 changes: 35 additions & 12 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/testutil"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -79,8 +80,12 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(mbucket, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil)
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)

// Clean User with no error
cleaner.bucketClient = bkt
Expand Down Expand Up @@ -176,8 +181,12 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -333,8 +342,12 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -393,8 +406,12 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -447,8 +464,12 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
reg := prometheus.NewPedanticRegistry()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
require.NoError(t, cleaner.cleanUsers(ctx, true))

assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(`
Expand Down Expand Up @@ -578,8 +599,12 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)

assertBlockExists := func(user string, block ulid.ULID, expectExists bool) {
exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename))
Expand Down Expand Up @@ -607,9 +632,6 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
# TYPE cortex_bucket_blocks_marked_for_deletion_count gauge
cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 0
cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0
# HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor.
# TYPE cortex_compactor_blocks_marked_for_deletion_total counter
cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 0
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
Expand Down Expand Up @@ -650,7 +672,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0
# HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor.
# TYPE cortex_compactor_blocks_marked_for_deletion_total counter
cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 1
cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-1"} 1
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
Expand Down Expand Up @@ -688,7 +710,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0
# HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor.
# TYPE cortex_compactor_blocks_marked_for_deletion_total counter
cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 1
cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-1"} 1
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
Expand Down Expand Up @@ -717,7 +739,8 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0
# HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor.
# TYPE cortex_compactor_blocks_marked_for_deletion_total counter
cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 3
cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-1"} 1
cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-2"} 2
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
Expand Down
Loading
Loading