Skip to content

Commit

Permalink
Fix TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod flakyn…
Browse files Browse the repository at this point in the history
…ess (#894)

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Jan 26, 2022
1 parent 6a616ce commit 5c24428
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 31 deletions.
19 changes: 11 additions & 8 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@ import (
util_log "github.com/grafana/mimir/pkg/util/log"
)

const (
defaultDeleteBlocksConcurrency = 16
)

type BlocksCleanerConfig struct {
DeletionDelay time.Duration
CleanupInterval time.Duration
CleanupConcurrency int
TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug".
DeletionDelay time.Duration
CleanupInterval time.Duration
CleanupConcurrency int
TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug".
DeleteBlocksConcurrency int
}

type BlocksCleaner struct {
Expand Down Expand Up @@ -360,8 +365,6 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string) (returnErr
return nil
}

const deleteBlocksConcurrency = 16

// Concurrently deletes blocks marked for deletion, and removes blocks from index.
func (c *BlocksCleaner) deleteBlocksMarkedForDeletion(ctx context.Context, idx *bucketindex.Index, userBucket objstore.Bucket, userLogger log.Logger) {
blocksToDelete := make([]ulid.ULID, 0, len(idx.BlockDeletionMarks))
Expand All @@ -377,7 +380,7 @@ func (c *BlocksCleaner) deleteBlocksMarkedForDeletion(ctx context.Context, idx *
var mu sync.Mutex

// We don't want to return errors from our function, as that would stop ForEach loop early.
_ = concurrency.ForEachJob(ctx, len(blocksToDelete), deleteBlocksConcurrency, func(ctx context.Context, jobIdx int) error {
_ = concurrency.ForEachJob(ctx, len(blocksToDelete), c.cfg.DeleteBlocksConcurrency, func(ctx context.Context, jobIdx int) error {
blockID := blocksToDelete[jobIdx]

if err := block.Delete(ctx, userLogger, userBucket, blockID); err != nil {
Expand Down Expand Up @@ -415,7 +418,7 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map
var mu sync.Mutex

// We don't want to return errors from our function, as that would stop ForEach loop early.
_ = concurrency.ForEachJob(ctx, len(blocks), deleteBlocksConcurrency, func(ctx context.Context, jobIdx int) error {
_ = concurrency.ForEachJob(ctx, len(blocks), c.cfg.DeleteBlocksConcurrency, func(ctx context.Context, jobIdx int) error {
blockID := blocks[jobIdx]

// We can safely delete only partial blocks with a deletion mark.
Expand Down
44 changes: 25 additions & 19 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
require.NoError(t, bucketClient.Upload(context.Background(), user4DebugMetaFile, strings.NewReader("some random content here")))

cfg := BlocksCleanerConfig{
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: options.concurrency,
TenantCleanupDelay: options.tenantDeletionDelay,
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: options.concurrency,
TenantCleanupDelay: options.tenantDeletionDelay,
DeleteBlocksConcurrency: 1,
}

reg := prometheus.NewPedanticRegistry()
Expand Down Expand Up @@ -231,9 +232,10 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
}

cfg := BlocksCleanerConfig{
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
}

logger := log.NewNopLogger()
Expand Down Expand Up @@ -290,9 +292,10 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
require.NoError(t, bucketClient.Upload(ctx, path.Join(userID, bucketindex.IndexCompressedFilename), strings.NewReader("invalid!}")))

cfg := BlocksCleanerConfig{
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
}

logger := log.NewNopLogger()
Expand Down Expand Up @@ -338,9 +341,10 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
createTSDBBlock(t, bucketClient, "user-2", 30, 40, 2, nil)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
}

ctx := context.Background()
Expand Down Expand Up @@ -405,9 +409,10 @@ func TestBlocksCleaner_ShouldNotCleanupUserThatDoesntBelongToShardAnymore(t *tes
createTSDBBlock(t, bucketClient, "user-2", 20, 30, 2, nil)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
}

ctx := context.Background()
Expand Down Expand Up @@ -518,9 +523,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
block4 := createTSDBBlock(t, bucketClient, "user-2", ts(-8), ts(-6), 2, nil)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
}

ctx := context.Background()
Expand Down
9 changes: 5 additions & 4 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,11 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error {

// Create the blocks cleaner (service).
c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{
DeletionDelay: c.compactorCfg.DeletionDelay,
CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1),
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
DeletionDelay: c.compactorCfg.DeletionDelay,
CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1),
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
DeleteBlocksConcurrency: defaultDeleteBlocksConcurrency,
}, c.bucketClient, c.shardingStrategy.blocksCleanerOwnUser, c.cfgProvider, c.parentLogger, c.registerer)

// Start blocks cleaner asynchronously, don't wait until initial cleanup is finished.
Expand Down

0 comments on commit 5c24428

Please sign in to comment.