Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce cleaner visit marker #6113

Merged
merged 18 commits into from
Jul 31, 2024
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* [ENHANCEMENT] Compactor: Differentiate retry and halt error and retry failed compaction only on retriable error. #6111
* [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040
* [ENHANCEMENT] Compactor: Split cleaner cycle for active and deleted tenants. #6112
* [ENHANCEMENT] Compactor: Introduce cleaner visit marker. #6113
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
* [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018
Expand Down
10 changes: 10 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,16 @@ compactor:
# CLI flag: -compactor.block-visit-marker-file-update-interval
[block_visit_marker_file_update_interval: <duration> | default = 1m]

# How long cleaner visit marker file should be considered as expired and able
# to be picked up by cleaner again.
# CLI flag: -compactor.cleaner-visit-marker-timeout
[cleaner_visit_marker_timeout: <duration> | default = 10m]
alexqyle marked this conversation as resolved.
Show resolved Hide resolved

# How frequently cleaner visit marker file should be updated when cleaning
# user.
# CLI flag: -compactor.cleaner-visit-marker-file-update-interval
[cleaner_visit_marker_file_update_interval: <duration> | default = 5m]

# When enabled, index verification will ignore out of order label names.
# CLI flag: -compactor.accept-malformed-index
[accept_malformed_index: <boolean> | default = false]
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2232,6 +2232,15 @@ sharding_ring:
# CLI flag: -compactor.block-visit-marker-file-update-interval
[block_visit_marker_file_update_interval: <duration> | default = 1m]

# How long cleaner visit marker file should be considered as expired and able to
# be picked up by cleaner again.
# CLI flag: -compactor.cleaner-visit-marker-timeout
[cleaner_visit_marker_timeout: <duration> | default = 10m]

# How frequently cleaner visit marker file should be updated when cleaning user.
# CLI flag: -compactor.cleaner-visit-marker-file-update-interval
[cleaner_visit_marker_file_update_interval: <duration> | default = 5m]

# When enabled, index verification will ignore out of order label names.
# CLI flag: -compactor.accept-malformed-index
[accept_malformed_index: <boolean> | default = false]
Expand Down
64 changes: 59 additions & 5 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ type BlocksCleaner struct {
bucketClient objstore.InstrumentedBucket
usersScanner *cortex_tsdb.UsersScanner

ringLifecyclerID string

// Keep track of the last owned users.
lastOwnedUsers []string

cleanerVisitMarkerTimeout time.Duration
cleanerVisitMarkerFileUpdateInterval time.Duration

// Metrics.
runsStarted *prometheus.CounterVec
runsCompleted *prometheus.CounterVec
Expand All @@ -61,6 +66,8 @@ type BlocksCleaner struct {
blocksCleanedTotal prometheus.Counter
blocksFailedTotal prometheus.Counter
blocksMarkedForDeletion *prometheus.CounterVec
cleanerVisitMarkerReadFailed prometheus.Counter
cleanerVisitMarkerWriteFailed prometheus.Counter
tenantBlocks *prometheus.GaugeVec
tenantBlocksMarkedForDelete *prometheus.GaugeVec
tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec
Expand All @@ -76,15 +83,21 @@ func NewBlocksCleaner(
usersScanner *cortex_tsdb.UsersScanner,
cfgProvider ConfigProvider,
logger log.Logger,
ringLifecyclerID string,
reg prometheus.Registerer,
cleanerVisitMarkerTimeout time.Duration,
cleanerVisitMarkerFileUpdateInterval time.Duration,
blocksMarkedForDeletion *prometheus.CounterVec,
) *BlocksCleaner {
c := &BlocksCleaner{
cfg: cfg,
bucketClient: bucketClient,
usersScanner: usersScanner,
cfgProvider: cfgProvider,
logger: log.With(logger, "component", "cleaner"),
cfg: cfg,
bucketClient: bucketClient,
usersScanner: usersScanner,
cfgProvider: cfgProvider,
logger: log.With(logger, "component", "cleaner"),
ringLifecyclerID: ringLifecyclerID,
cleanerVisitMarkerTimeout: cleanerVisitMarkerTimeout,
cleanerVisitMarkerFileUpdateInterval: cleanerVisitMarkerFileUpdateInterval,
runsStarted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_compactor_block_cleanup_started_total",
Help: "Total number of blocks cleanup runs started.",
Expand All @@ -110,6 +123,14 @@ func NewBlocksCleaner(
Help: "Total number of blocks failed to be deleted.",
}),
blocksMarkedForDeletion: blocksMarkedForDeletion,
cleanerVisitMarkerReadFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_cleaner_visit_marker_read_failed",
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
Help: "Number of cleaner visit marker file failed to be read.",
}),
cleanerVisitMarkerWriteFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_cleaner_visit_marker_write_failed_total",
Help: "Total number of cleaner visit marker file failed to be written.",
}),
alexqyle marked this conversation as resolved.
Show resolved Hide resolved

// The following metrics don't have the "cortex_compactor" prefix because not strictly related to
// the compactor. They're just tracked by the compactor because it's the most logical place where these
Expand Down Expand Up @@ -154,6 +175,10 @@ type cleanerJob struct {
timestamp int64
}

func (c *BlocksCleaner) SetRingLifecyclerID(ringLifecyclerID string) {
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
c.ringLifecyclerID = ringLifecyclerID
}

func (c *BlocksCleaner) starting(ctx context.Context) error {
// Run a cleanup so that any other service depending on this service
// is guaranteed to start once the initial cleanup has been done.
Expand Down Expand Up @@ -246,7 +271,15 @@ func (c *BlocksCleaner) cleanUpActiveUsers(ctx context.Context, users []string,
return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
userLogger := util_log.WithUserID(userID, c.logger)
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
visitMarkerManager, isVisited, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket)
if err != nil {
return err
}
if isVisited {
return nil
}
errChan := make(chan error, 1)
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
errChan <- nil
}()
Expand All @@ -273,7 +306,15 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e
return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
userLogger := util_log.WithUserID(userID, c.logger)
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
visitMarkerManager, isVisited, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket)
if err != nil {
return err
}
if isVisited {
return nil
}
errChan := make(chan error, 1)
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
defer func() {
errChan <- nil
}()
Expand Down Expand Up @@ -307,6 +348,19 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
return users, deleted, nil
}

func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (visitMarkerManager *VisitMarkerManager, isVisited bool, err error) {
cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID)
visitMarkerManager = NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker, c.cleanerVisitMarkerReadFailed, c.cleanerVisitMarkerWriteFailed)

existingCleanerVisitMarker := &CleanerVisitMarker{}
err = visitMarkerManager.ReadVisitMarker(ctx, existingCleanerVisitMarker)
if err != nil && !errors.Is(err, errorVisitMarkerNotFound) {
return nil, false, errors.Wrapf(err, "failed to read cleaner visit marker")
}
isVisited = !errors.Is(err, errorVisitMarkerNotFound) && existingCleanerVisitMarker.IsVisited(c.cleanerVisitMarkerTimeout)
return visitMarkerManager, isVisited, nil
}

// Remove blocks and remaining data for tenant marked for deletion.
func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) error {

Expand Down
36 changes: 30 additions & 6 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)

// Clean User with no error
cleaner.bucketClient = bkt
Expand Down Expand Up @@ -194,7 +194,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -355,7 +355,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -419,7 +419,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -477,7 +477,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
require.NoError(t, err)
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true))
Expand Down Expand Up @@ -618,7 +618,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)

assertBlockExists := func(user string, block ulid.ULID, expectExists bool) {
exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename))
Expand All @@ -628,6 +628,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {

// Existing behaviour - retention period disabled.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

cfgProvider.userRetentionPeriods["user-1"] = 0
cfgProvider.userRetentionPeriods["user-2"] = 0

Expand Down Expand Up @@ -662,6 +666,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {

// Retention enabled only for a single user, but does nothing.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

cfgProvider.userRetentionPeriods["user-1"] = 9 * time.Hour

activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
Expand All @@ -677,6 +685,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
// Retention enabled only for a single user, marking a single block.
// Note the block won't be deleted yet due to deletion delay.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

cfgProvider.userRetentionPeriods["user-1"] = 7 * time.Hour

activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
Expand Down Expand Up @@ -710,6 +722,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {

// Marking the block again, before the deletion occurs, should not cause an error.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
require.NoError(t, err)
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, false))
Expand All @@ -722,6 +738,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {

// Reduce the deletion delay. Now the block will be deleted.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

cleaner.cfg.DeletionDelay = 0

activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
Expand Down Expand Up @@ -755,6 +775,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {

// Retention enabled for other user; test deleting multiple blocks.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

cfgProvider.userRetentionPeriods["user-2"] = 5 * time.Hour

activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
Expand Down
68 changes: 68 additions & 0 deletions pkg/compactor/cleaner_visit_marker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package compactor

import (
"path"
"time"

"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
)

const (
// CleanerVisitMarkerName is the name of cleaner visit marker file.
CleanerVisitMarkerName = "cleaner-visit-marker.json"
// CleanerVisitMarkerVersion1 is the current supported version of cleaner visit mark file.
CleanerVisitMarkerVersion1 = 1
)

type CleanerVisitMarker struct {
CompactorID string `json:"compactorID"`
Status VisitStatus `json:"status"`
// VisitTime is a unix timestamp of when the partition was visited (mark updated).
VisitTime int64 `json:"visitTime"`
// Version of the file.
Version int `json:"version"`
}

func NewCleanerVisitMarker(compactorID string) *CleanerVisitMarker {
return &CleanerVisitMarker{
CompactorID: compactorID,
Version: CleanerVisitMarkerVersion1,
}
}

func (b *CleanerVisitMarker) IsExpired(cleanerVisitMarkerTimeout time.Duration) bool {
return !time.Now().Before(time.Unix(b.VisitTime, 0).Add(cleanerVisitMarkerTimeout))
}

func (b *CleanerVisitMarker) IsVisited(cleanerVisitMarkerTimeout time.Duration) bool {
return !(b.GetStatus() == Completed) && !(b.GetStatus() == Failed) && !b.IsExpired(cleanerVisitMarkerTimeout)
}

func (b *CleanerVisitMarker) GetStatus() VisitStatus {
return b.Status
}

func (b *CleanerVisitMarker) GetVisitMarkerFilePath() string {
return GetCleanerVisitMarkerFilePath()
}

func (b *CleanerVisitMarker) UpdateStatus(ownerIdentifier string, status VisitStatus) {
b.CompactorID = ownerIdentifier
b.Status = status
b.VisitTime = time.Now().Unix()
}

func (b *CleanerVisitMarker) LogInfo() []string {
return []string{
"compactor_id",
b.CompactorID,
"status",
string(b.Status),
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
"visit_time",
time.Unix(b.VisitTime, 0).String(),
}
}

func GetCleanerVisitMarkerFilePath() string {
return path.Join(bucketindex.MarkersPathname, CleanerVisitMarkerName)
}
Loading
Loading