Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce cleaner visit marker #6113

Merged
merged 18 commits into from
Jul 31, 2024
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* [ENHANCEMENT] Compactor: Differentiate retry and halt error and retry failed compaction only on retriable error. #6111
* [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040
* [ENHANCEMENT] Compactor: Split cleaner cycle for active and deleted tenants. #6112
* [ENHANCEMENT] Compactor: Introduce cleaner visit marker. #6113
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
* [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018
Expand Down
11 changes: 11 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,17 @@ compactor:
# CLI flag: -compactor.block-visit-marker-file-update-interval
[block_visit_marker_file_update_interval: <duration> | default = 1m]

# How long cleaner visit marker file should be considered as expired and able
# to be picked up by cleaner again. The value should be smaller than
# -compactor.cleanup-interval
# CLI flag: -compactor.cleaner-visit-marker-timeout
[cleaner_visit_marker_timeout: <duration> | default = 10m]
alexqyle marked this conversation as resolved.
Show resolved Hide resolved

# How frequently cleaner visit marker file should be updated when cleaning
# user.
# CLI flag: -compactor.cleaner-visit-marker-file-update-interval
[cleaner_visit_marker_file_update_interval: <duration> | default = 5m]

# When enabled, index verification will ignore out of order label names.
# CLI flag: -compactor.accept-malformed-index
[accept_malformed_index: <boolean> | default = false]
Expand Down
10 changes: 10 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2232,6 +2232,16 @@ sharding_ring:
# CLI flag: -compactor.block-visit-marker-file-update-interval
[block_visit_marker_file_update_interval: <duration> | default = 1m]

# How long cleaner visit marker file should be considered as expired and able to
# be picked up by cleaner again. The value should be smaller than
# -compactor.cleanup-interval
# CLI flag: -compactor.cleaner-visit-marker-timeout
[cleaner_visit_marker_timeout: <duration> | default = 10m]

# How frequently cleaner visit marker file should be updated when cleaning user.
# CLI flag: -compactor.cleaner-visit-marker-file-update-interval
[cleaner_visit_marker_file_update_interval: <duration> | default = 5m]

# When enabled, index verification will ignore out of order label names.
# CLI flag: -compactor.accept-malformed-index
[accept_malformed_index: <boolean> | default = false]
Expand Down
50 changes: 45 additions & 5 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ type BlocksCleaner struct {
bucketClient objstore.InstrumentedBucket
usersScanner *cortex_tsdb.UsersScanner

ringLifecyclerID string

// Keep track of the last owned users.
lastOwnedUsers []string

cleanerVisitMarkerTimeout time.Duration
cleanerVisitMarkerFileUpdateInterval time.Duration

// Metrics.
runsStarted *prometheus.CounterVec
runsCompleted *prometheus.CounterVec
Expand All @@ -76,15 +81,21 @@ func NewBlocksCleaner(
usersScanner *cortex_tsdb.UsersScanner,
cfgProvider ConfigProvider,
logger log.Logger,
ringLifecyclerID string,
reg prometheus.Registerer,
cleanerVisitMarkerTimeout time.Duration,
cleanerVisitMarkerFileUpdateInterval time.Duration,
blocksMarkedForDeletion *prometheus.CounterVec,
) *BlocksCleaner {
c := &BlocksCleaner{
cfg: cfg,
bucketClient: bucketClient,
usersScanner: usersScanner,
cfgProvider: cfgProvider,
logger: log.With(logger, "component", "cleaner"),
cfg: cfg,
bucketClient: bucketClient,
usersScanner: usersScanner,
cfgProvider: cfgProvider,
logger: log.With(logger, "component", "cleaner"),
ringLifecyclerID: ringLifecyclerID,
cleanerVisitMarkerTimeout: cleanerVisitMarkerTimeout,
cleanerVisitMarkerFileUpdateInterval: cleanerVisitMarkerFileUpdateInterval,
runsStarted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_compactor_block_cleanup_started_total",
Help: "Total number of blocks cleanup runs started.",
Expand Down Expand Up @@ -246,7 +257,15 @@ func (c *BlocksCleaner) cleanUpActiveUsers(ctx context.Context, users []string,
return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
userLogger := util_log.WithUserID(userID, c.logger)
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
visitMarkerManager, isVisited, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket)
if err != nil {
return err
}
if isVisited {
return nil
}
errChan := make(chan error, 1)
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
errChan <- nil
}()
Expand All @@ -273,7 +292,15 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e
return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
userLogger := util_log.WithUserID(userID, c.logger)
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
visitMarkerManager, isVisited, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket)
if err != nil {
return err
}
if isVisited {
return nil
}
errChan := make(chan error, 1)
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
defer func() {
errChan <- nil
}()
Expand Down Expand Up @@ -307,6 +334,19 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
return users, deleted, nil
}

func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (visitMarkerManager *VisitMarkerManager, isVisited bool, err error) {
cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID)
visitMarkerManager = NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker)

existingCleanerVisitMarker := &CleanerVisitMarker{}
err = visitMarkerManager.ReadVisitMarker(ctx, existingCleanerVisitMarker)
if err != nil && !errors.Is(err, errorVisitMarkerNotFound) {
return nil, false, errors.Wrapf(err, "failed to read cleaner visit marker")
}
isVisited = !errors.Is(err, errorVisitMarkerNotFound) && existingCleanerVisitMarker.IsVisited(c.cleanerVisitMarkerTimeout)
return visitMarkerManager, isVisited, nil
}

// Remove blocks and remaining data for tenant marked for deletion.
func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) error {

Expand Down
36 changes: 30 additions & 6 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)

// Clean User with no error
cleaner.bucketClient = bkt
Expand Down Expand Up @@ -194,7 +194,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -355,7 +355,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -419,7 +419,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -477,7 +477,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
require.NoError(t, err)
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true))
Expand Down Expand Up @@ -618,7 +618,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)

assertBlockExists := func(user string, block ulid.ULID, expectExists bool) {
exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename))
Expand All @@ -628,6 +628,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {

// Existing behaviour - retention period disabled.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

cfgProvider.userRetentionPeriods["user-1"] = 0
cfgProvider.userRetentionPeriods["user-2"] = 0

Expand Down Expand Up @@ -662,6 +666,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {

// Retention enabled only for a single user, but does nothing.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

cfgProvider.userRetentionPeriods["user-1"] = 9 * time.Hour

activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
Expand All @@ -677,6 +685,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
// Retention enabled only for a single user, marking a single block.
// Note the block won't be deleted yet due to deletion delay.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

cfgProvider.userRetentionPeriods["user-1"] = 7 * time.Hour

activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
Expand Down Expand Up @@ -710,6 +722,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {

// Marking the block again, before the deletion occurs, should not cause an error.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
require.NoError(t, err)
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, false))
Expand All @@ -722,6 +738,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {

// Reduce the deletion delay. Now the block will be deleted.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

cleaner.cfg.DeletionDelay = 0

activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
Expand Down Expand Up @@ -755,6 +775,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {

// Retention enabled for other user; test deleting multiple blocks.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

cfgProvider.userRetentionPeriods["user-2"] = 5 * time.Hour

activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
Expand Down
66 changes: 66 additions & 0 deletions pkg/compactor/cleaner_visit_marker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package compactor

import (
"fmt"
"path"
"time"

"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
)

const (
// CleanerVisitMarkerName is the name of cleaner visit marker file.
CleanerVisitMarkerName = "cleaner-visit-marker.json"
// CleanerVisitMarkerVersion1 is the current supported version of cleaner visit mark file.
CleanerVisitMarkerVersion1 = 1
)

type CleanerVisitMarker struct {
CompactorID string `json:"compactorID"`
Status VisitStatus `json:"status"`
// VisitTime is a unix timestamp of when the partition was visited (mark updated).
VisitTime int64 `json:"visitTime"`
// Version of the file.
Version int `json:"version"`
}

func NewCleanerVisitMarker(compactorID string) *CleanerVisitMarker {
return &CleanerVisitMarker{
CompactorID: compactorID,
Version: CleanerVisitMarkerVersion1,
}
}

func (b *CleanerVisitMarker) IsExpired(cleanerVisitMarkerTimeout time.Duration) bool {
return !time.Now().Before(time.Unix(b.VisitTime, 0).Add(cleanerVisitMarkerTimeout))
}

func (b *CleanerVisitMarker) IsVisited(cleanerVisitMarkerTimeout time.Duration) bool {
return !(b.GetStatus() == Completed) && !(b.GetStatus() == Failed) && !b.IsExpired(cleanerVisitMarkerTimeout)
}

func (b *CleanerVisitMarker) GetStatus() VisitStatus {
return b.Status
}

func (b *CleanerVisitMarker) GetVisitMarkerFilePath() string {
return GetCleanerVisitMarkerFilePath()
}

func (b *CleanerVisitMarker) UpdateStatus(ownerIdentifier string, status VisitStatus) {
b.CompactorID = ownerIdentifier
b.Status = status
b.VisitTime = time.Now().Unix()
}

func (b *CleanerVisitMarker) String() string {
return fmt.Sprintf("compactor_id=%s status=%s visit_time=%s",
b.CompactorID,
b.Status,
time.Unix(b.VisitTime, 0).String(),
)
}

func GetCleanerVisitMarkerFilePath() string {
return path.Join(bucketindex.MarkersPathname, CleanerVisitMarkerName)
}
29 changes: 20 additions & 9 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ type Config struct {
BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"`
BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"`

// Cleaner visit marker file config
CleanerVisitMarkerTimeout time.Duration `yaml:"cleaner_visit_marker_timeout"`
CleanerVisitMarkerFileUpdateInterval time.Duration `yaml:"cleaner_visit_marker_file_update_interval"`

AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
}
Expand Down Expand Up @@ -255,6 +259,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.BlockVisitMarkerTimeout, "compactor.block-visit-marker-timeout", 5*time.Minute, "How long block visit marker file should be considered as expired and able to be picked up by compactor again.")
f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.")

f.DurationVar(&cfg.CleanerVisitMarkerTimeout, "compactor.cleaner-visit-marker-timeout", 10*time.Minute, "How long cleaner visit marker file should be considered as expired and able to be picked up by cleaner again. The value should be smaller than -compactor.cleanup-interval")
f.DurationVar(&cfg.CleanerVisitMarkerFileUpdateInterval, "compactor.cleaner-visit-marker-file-update-interval", 5*time.Minute, "How frequently cleaner visit marker file should be updated when cleaning user.")

f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.")
f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status")
}
Expand Down Expand Up @@ -522,15 +529,7 @@ func (c *Compactor) starting(ctx context.Context) error {
// Create the users scanner.
c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUserForCleanUp, c.parentLogger)

// Create the blocks cleaner (service).
c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{
DeletionDelay: c.compactorCfg.DeletionDelay,
CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1),
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled,
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
}, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, c.registerer, c.compactorMetrics.syncerBlocksMarkedForDeletion)

var cleanerRingLifecyclerID = "default-cleaner"
// Initialize the compactors ring if sharding is enabled.
if c.compactorCfg.ShardingEnabled {
lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig()
Expand All @@ -539,6 +538,8 @@ func (c *Compactor) starting(ctx context.Context) error {
return errors.Wrap(err, "unable to initialize compactor ring lifecycler")
}

cleanerRingLifecyclerID = c.ringLifecycler.ID

c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ringKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
if err != nil {
return errors.Wrap(err, "unable to initialize compactor ring")
Expand Down Expand Up @@ -588,6 +589,16 @@ func (c *Compactor) starting(ctx context.Context) error {
}
}

// Create the blocks cleaner (service).
c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{
DeletionDelay: c.compactorCfg.DeletionDelay,
CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1),
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled,
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
}, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
c.compactorMetrics.syncerBlocksMarkedForDeletion)

// Ensure an initial cleanup occurred before starting the compactor.
if err := services.StartAndAwaitRunning(ctx, c.blocksCleaner); err != nil {
c.ringSubservices.StopAsync()
Expand Down
Loading
Loading