Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce cleaner visit marker #6113

Merged
merged 18 commits into from
Jul 31, 2024
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071
* [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081
* [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104
* [FEATURE] Compactor: Introduce cleaner visit marker. #6113
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
Expand Down
65 changes: 60 additions & 5 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ type BlocksCleaner struct {
bucketClient objstore.InstrumentedBucket
usersScanner *cortex_tsdb.UsersScanner

ringLifecyclerID string

// Keep track of the last owned users.
lastOwnedUsers []string

cleanerVisitMarkerTimeout time.Duration
cleanerVisitMarkerFileUpdateInterval time.Duration

// Metrics.
runsStarted *prometheus.CounterVec
runsCompleted *prometheus.CounterVec
Expand All @@ -61,6 +66,8 @@ type BlocksCleaner struct {
blocksCleanedTotal prometheus.Counter
blocksFailedTotal prometheus.Counter
blocksMarkedForDeletion *prometheus.CounterVec
CleanerVisitMarkerReadFailed prometheus.Counter
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
CleanerVisitMarkerWriteFailed prometheus.Counter
tenantBlocks *prometheus.GaugeVec
tenantBlocksMarkedForDelete *prometheus.GaugeVec
tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec
Expand All @@ -77,14 +84,19 @@ func NewBlocksCleaner(
cfgProvider ConfigProvider,
logger log.Logger,
reg prometheus.Registerer,
cleanerVisitMarkerTimeout time.Duration,
cleanerVisitMarkerFileUpdateInterval time.Duration,
blocksMarkedForDeletion *prometheus.CounterVec,
) *BlocksCleaner {
c := &BlocksCleaner{
cfg: cfg,
bucketClient: bucketClient,
usersScanner: usersScanner,
cfgProvider: cfgProvider,
logger: log.With(logger, "component", "cleaner"),
cfg: cfg,
bucketClient: bucketClient,
usersScanner: usersScanner,
cfgProvider: cfgProvider,
logger: log.With(logger, "component", "cleaner"),
ringLifecyclerID: "default-cleaner",
cleanerVisitMarkerTimeout: cleanerVisitMarkerTimeout,
cleanerVisitMarkerFileUpdateInterval: cleanerVisitMarkerFileUpdateInterval,
runsStarted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_compactor_block_cleanup_started_total",
Help: "Total number of blocks cleanup runs started.",
Expand All @@ -110,6 +122,14 @@ func NewBlocksCleaner(
Help: "Total number of blocks failed to be deleted.",
}),
blocksMarkedForDeletion: blocksMarkedForDeletion,
CleanerVisitMarkerReadFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_cleaner_visit_marker_read_failed",
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
Help: "Number of cleaner visit marker file failed to be read.",
}),
CleanerVisitMarkerWriteFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_cleaner_visit_marker_write_failed",
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
Help: "Number of cleaner visit marker file failed to be written.",
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
}),
alexqyle marked this conversation as resolved.
Show resolved Hide resolved

// The following metrics don't have the "cortex_compactor" prefix because not strictly related to
// the compactor. They're just tracked by the compactor because it's the most logical place where these
Expand Down Expand Up @@ -154,6 +174,10 @@ type cleanerJob struct {
timestamp int64
}

func (c *BlocksCleaner) SetRingLifecyclerID(ringLifecyclerID string) {
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
c.ringLifecyclerID = ringLifecyclerID
}

func (c *BlocksCleaner) starting(ctx context.Context) error {
// Run a cleanup so that any other service depending on this service
// is guaranteed to start once the initial cleanup has been done.
Expand Down Expand Up @@ -246,7 +270,15 @@ func (c *BlocksCleaner) cleanUpActiveUsers(ctx context.Context, users []string,
return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
userLogger := util_log.WithUserID(userID, c.logger)
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
visitMarkerManager, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket)
if err != nil {
return err
}
if visitMarkerManager == nil {
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
errChan := make(chan error, 1)
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
errChan <- nil
}()
Expand All @@ -273,7 +305,15 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e
return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
userLogger := util_log.WithUserID(userID, c.logger)
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
visitMarkerManager, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket)
if err != nil {
return err
}
if visitMarkerManager == nil {
return nil
}
errChan := make(chan error, 1)
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
defer func() {
errChan <- nil
}()
Expand Down Expand Up @@ -307,6 +347,21 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
return users, deleted, nil
}

func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (*VisitMarkerManager, error) {
cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID)
visitMarkerManager := NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker, c.CleanerVisitMarkerReadFailed, c.CleanerVisitMarkerWriteFailed)

existingCleanerVisitMarker := &CleanerVisitMarker{}
err := visitMarkerManager.ReadVisitMarker(ctx, existingCleanerVisitMarker)
if err != nil && !errors.Is(err, ErrorVisitMarkerNotFound) {
return nil, errors.Wrapf(err, "failed to read cleaner visit marker")
}
if errors.Is(err, ErrorVisitMarkerNotFound) || !existingCleanerVisitMarker.IsVisited(c.cleanerVisitMarkerTimeout) {
return visitMarkerManager, nil
}
return nil, nil
}

// Remove blocks and remaining data for tenant marked for deletion.
func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) error {

Expand Down
12 changes: 6 additions & 6 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)

// Clean User with no error
cleaner.bucketClient = bkt
Expand Down Expand Up @@ -194,7 +194,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -355,7 +355,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -419,7 +419,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -477,7 +477,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
require.NoError(t, err)
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true))
Expand Down Expand Up @@ -618,7 +618,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)

assertBlockExists := func(user string, block ulid.ULID, expectExists bool) {
exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename))
Expand Down
98 changes: 98 additions & 0 deletions pkg/compactor/cleaner_visit_marker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package compactor

import (
"path"
"time"

"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
)

const (
// CleanerVisitMarkerName is the name of cleaner visit marker file.
CleanerVisitMarkerName = "cleaner-visit-marker.json"
// CleanerVisitMarkerVersion1 is the current supported version of cleaner visit mark file.
CleanerVisitMarkerVersion1 = 1
)

type CleanerVisitMarker struct {
CompactorID string `json:"compactorID"`
Status VisitStatus `json:"status"`
// VisitTime is a unix timestamp of when the partition was visited (mark updated).
VisitTime int64 `json:"visitTime"`
// Version of the file.
Version int `json:"version"`
}

func NewCleanerVisitMarker(compactorID string) *CleanerVisitMarker {
return &CleanerVisitMarker{
CompactorID: compactorID,
Version: CleanerVisitMarkerVersion1,
}
}

func (b *CleanerVisitMarker) IsExpired(cleanerVisitMarkerTimeout time.Duration) bool {
return !time.Now().Before(time.Unix(b.VisitTime, 0).Add(cleanerVisitMarkerTimeout))
}

func (b *CleanerVisitMarker) IsVisited(cleanerVisitMarkerTimeout time.Duration) bool {
return !b.IsCompleted() && !b.IsFailed() && !b.IsExpired(cleanerVisitMarkerTimeout)
}

func (b *CleanerVisitMarker) IsCompleted() bool {
return b.Status == Completed
}

func (b *CleanerVisitMarker) IsFailed() bool {
return b.Status == Failed
}

func (b *CleanerVisitMarker) IsInProgress() bool {
return b.Status == InProgress
}

func (b *CleanerVisitMarker) IsPending() bool {
return b.Status == Pending
}

func (b *CleanerVisitMarker) GetVisitMarkerFilePath() string {
return GetCleanerVisitMarkerFilePath()
}

func (b *CleanerVisitMarker) MarkInProgress(ownerIdentifier string) {
b.CompactorID = ownerIdentifier
b.Status = InProgress
b.VisitTime = time.Now().Unix()
}

func (b *CleanerVisitMarker) MarkPending(ownerIdentifier string) {
b.CompactorID = ownerIdentifier
b.Status = Pending
b.VisitTime = time.Now().Unix()
}

func (b *CleanerVisitMarker) MarkCompleted(ownerIdentifier string) {
b.CompactorID = ownerIdentifier
b.Status = Completed
b.VisitTime = time.Now().Unix()
}

func (b *CleanerVisitMarker) MarkFailed(ownerIdentifier string) {
b.CompactorID = ownerIdentifier
b.Status = Failed
b.VisitTime = time.Now().Unix()
}

func (b *CleanerVisitMarker) LogInfo() []string {
return []string{
"compactor_id",
b.CompactorID,
"status",
string(b.Status),
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
"visit_time",
time.Unix(b.VisitTime, 0).String(),
}
}

func GetCleanerVisitMarkerFilePath() string {
return path.Join(bucketindex.MarkersPathname, CleanerVisitMarkerName)
}
12 changes: 11 additions & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ type Config struct {
BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"`
BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"`

// Cleaner visit marker file config
CleanerVisitMarkerTimeout time.Duration `yaml:"cleaner_visit_marker_timeout"`
CleanerVisitMarkerFileUpdateInterval time.Duration `yaml:"cleaner_visit_marker_file_update_interval"`

AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
}
Expand Down Expand Up @@ -255,6 +259,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.BlockVisitMarkerTimeout, "compactor.block-visit-marker-timeout", 5*time.Minute, "How long block visit marker file should be considered as expired and able to be picked up by compactor again.")
f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.")

f.DurationVar(&cfg.CleanerVisitMarkerTimeout, "compactor.cleaner-visit-marker-timeout", 10*time.Minute, "How long cleaner visit marker file should be considered as expired and able to be picked up by cleaner again.")
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
f.DurationVar(&cfg.CleanerVisitMarkerFileUpdateInterval, "compactor.cleaner-visit-marker-file-update-interval", 5*time.Minute, "How frequently cleaner visit marker file should be updated when cleaning user.")

f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.")
f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status")
}
Expand Down Expand Up @@ -529,7 +536,8 @@ func (c *Compactor) starting(ctx context.Context) error {
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled,
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
}, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, c.registerer, c.compactorMetrics.syncerBlocksMarkedForDeletion)
}, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
c.compactorMetrics.syncerBlocksMarkedForDeletion)

// Initialize the compactors ring if sharding is enabled.
if c.compactorCfg.ShardingEnabled {
Expand All @@ -539,6 +547,8 @@ func (c *Compactor) starting(ctx context.Context) error {
return errors.Wrap(err, "unable to initialize compactor ring lifecycler")
}

c.blocksCleaner.SetRingLifecyclerID(c.ringLifecycler.ID)
alexqyle marked this conversation as resolved.
Show resolved Hide resolved

c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ringKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
if err != nil {
return errors.Wrap(err, "unable to initialize compactor ring")
Expand Down
Loading
Loading