Skip to content

Commit

Permalink
Centralize metrics used by compactor and add user label to compactor …
Browse files Browse the repository at this point in the history
…metrics (#6096)

* Centralize metrics used by compactor and add user label to compactor metrics.

Signed-off-by: Alex Le <leqiyue@amazon.com>

* Updated CHANGELOG

Signed-off-by: Alex Le <leqiyue@amazon.com>

* addressed comments

Signed-off-by: Alex Le <leqiyue@amazon.com>

* Added back missing metric

Signed-off-by: Alex Le <leqiyue@amazon.com>

---------

Signed-off-by: Alex Le <leqiyue@amazon.com>
  • Loading branch information
alexqyle authored Jul 23, 2024
1 parent 75cfab4 commit 3ae12a2
Show file tree
Hide file tree
Showing 11 changed files with 596 additions and 712 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040
* [ENHANCEMENT] Ruler: Add support for filtering by `match` field on Rules API. #6083
* [ENHANCEMENT] Distributor: Reduce memory usage when error volume is high. #6095
* [ENHANCEMENT] Compactor: Centralize metrics used by compactor and add user label to compactor metrics. #6096
* [ENHANCEMENT] Compactor: Add unique execution ID for each compaction cycle in log for easy debugging. #6097
* [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
Expand Down
25 changes: 15 additions & 10 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

const (
defaultDeleteBlocksConcurrency = 16
reasonValueRetention = "retention"
)

type BlocksCleanerConfig struct {
Expand Down Expand Up @@ -56,15 +57,23 @@ type BlocksCleaner struct {
runsLastSuccess prometheus.Gauge
blocksCleanedTotal prometheus.Counter
blocksFailedTotal prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForDeletion *prometheus.CounterVec
tenantBlocks *prometheus.GaugeVec
tenantBlocksMarkedForDelete *prometheus.GaugeVec
tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec
tenantPartialBlocks *prometheus.GaugeVec
tenantBucketIndexLastUpdate *prometheus.GaugeVec
}

func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.InstrumentedBucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner {
func NewBlocksCleaner(
cfg BlocksCleanerConfig,
bucketClient objstore.InstrumentedBucket,
usersScanner *cortex_tsdb.UsersScanner,
cfgProvider ConfigProvider,
logger log.Logger,
reg prometheus.Registerer,
blocksMarkedForDeletion *prometheus.CounterVec,
) *BlocksCleaner {
c := &BlocksCleaner{
cfg: cfg,
bucketClient: bucketClient,
Expand Down Expand Up @@ -95,11 +104,7 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Instrumente
Name: "cortex_compactor_block_cleanup_failures_total",
Help: "Total number of blocks failed to be deleted.",
}),
blocksMarkedForDeletion: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
ConstLabels: prometheus.Labels{"reason": "retention"},
}),
blocksMarkedForDeletion: blocksMarkedForDeletion,

// The following metrics don't have the "cortex_compactor" prefix because not strictly related to
// the compactor. They're just tracked by the compactor because it's the most logical place where these
Expand Down Expand Up @@ -374,7 +379,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
// We do not want to stop the remaining work in the cleaner if an
// error occurs here. Errors are logged in the function.
retention := c.cfgProvider.CompactorBlocksRetentionPeriod(userID)
c.applyUserRetentionPeriod(ctx, idx, retention, userBucket, userLogger)
c.applyUserRetentionPeriod(ctx, idx, retention, userBucket, userLogger, userID)
}

// Generate an updated in-memory version of the bucket index.
Expand Down Expand Up @@ -498,7 +503,7 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map
}

// applyUserRetentionPeriod marks blocks for deletion which have aged past the retention period.
func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucketindex.Index, retention time.Duration, userBucket objstore.Bucket, userLogger log.Logger) {
func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucketindex.Index, retention time.Duration, userBucket objstore.Bucket, userLogger log.Logger, userID string) {
// The retention period of zero is a special value indicating to never delete.
if retention <= 0 {
return
Expand All @@ -511,7 +516,7 @@ func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucke
// the cleaner will retry applying the retention in its next cycle.
for _, b := range blocks {
level.Info(userLogger).Log("msg", "applied retention: marking block for deletion", "block", b.ID, "maxTime", b.MaxTime)
if err := block.MarkForDeletion(ctx, userLogger, userBucket, b.ID, fmt.Sprintf("block exceeding retention of %v", retention), c.blocksMarkedForDeletion); err != nil {
if err := block.MarkForDeletion(ctx, userLogger, userBucket, b.ID, fmt.Sprintf("block exceeding retention of %v", retention), c.blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil {
level.Warn(userLogger).Log("msg", "failed to mark block for deletion", "block", b.ID, "err", err)
}
}
Expand Down
47 changes: 35 additions & 12 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/testutil"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -79,8 +80,12 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(mbucket, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil)
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)

// Clean User with no error
cleaner.bucketClient = bkt
Expand Down Expand Up @@ -176,8 +181,12 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -333,8 +342,12 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -393,8 +406,12 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -447,8 +464,12 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
reg := prometheus.NewPedanticRegistry()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
require.NoError(t, cleaner.cleanUsers(ctx, true))

assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(`
Expand Down Expand Up @@ -578,8 +599,12 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)

assertBlockExists := func(user string, block ulid.ULID, expectExists bool) {
exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename))
Expand Down Expand Up @@ -607,9 +632,6 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
# TYPE cortex_bucket_blocks_marked_for_deletion_count gauge
cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 0
cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0
# HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor.
# TYPE cortex_compactor_blocks_marked_for_deletion_total counter
cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 0
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
Expand Down Expand Up @@ -650,7 +672,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0
# HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor.
# TYPE cortex_compactor_blocks_marked_for_deletion_total counter
cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 1
cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-1"} 1
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
Expand Down Expand Up @@ -688,7 +710,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0
# HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor.
# TYPE cortex_compactor_blocks_marked_for_deletion_total counter
cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 1
cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-1"} 1
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
Expand Down Expand Up @@ -717,7 +739,8 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0
# HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor.
# TYPE cortex_compactor_blocks_marked_for_deletion_total counter
cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 3
cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-1"} 1
cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-2"} 2
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
Expand Down
Loading

0 comments on commit 3ae12a2

Please sign in to comment.