From f754a94a2030183787713246812a4f7353eb2495 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 23 Jul 2024 17:27:30 -0700 Subject: [PATCH 01/17] Introduce cleaner visit marker to make sure cleaning cycle for one tenant can only be run on one cleaner at a time Signed-off-by: Alex Le --- pkg/compactor/blocks_cleaner.go | 65 +++++- pkg/compactor/blocks_cleaner_test.go | 12 +- pkg/compactor/cleaner_visit_marker.go | 98 +++++++++ pkg/compactor/compactor.go | 12 +- pkg/compactor/compactor_test.go | 43 +++- pkg/compactor/visit_marker.go | 220 +++++++++++++++++++ pkg/compactor/visit_marker_test.go | 294 ++++++++++++++++++++++++++ 7 files changed, 730 insertions(+), 14 deletions(-) create mode 100644 pkg/compactor/cleaner_visit_marker.go create mode 100644 pkg/compactor/visit_marker.go create mode 100644 pkg/compactor/visit_marker_test.go diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index a447231b64..f5d17a0a00 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -50,9 +50,14 @@ type BlocksCleaner struct { bucketClient objstore.InstrumentedBucket usersScanner *cortex_tsdb.UsersScanner + ringLifecyclerID string + // Keep track of the last owned users. lastOwnedUsers []string + cleanerVisitMarkerTimeout time.Duration + cleanerVisitMarkerFileUpdateInterval time.Duration + // Metrics. runsStarted *prometheus.CounterVec runsCompleted *prometheus.CounterVec @@ -61,6 +66,8 @@ type BlocksCleaner struct { blocksCleanedTotal prometheus.Counter blocksFailedTotal prometheus.Counter blocksMarkedForDeletion *prometheus.CounterVec + CleanerVisitMarkerReadFailed prometheus.Counter + CleanerVisitMarkerWriteFailed prometheus.Counter tenantBlocks *prometheus.GaugeVec tenantBlocksMarkedForDelete *prometheus.GaugeVec tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec @@ -77,14 +84,19 @@ func NewBlocksCleaner( cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer, + cleanerVisitMarkerTimeout time.Duration, + cleanerVisitMarkerFileUpdateInterval time.Duration, blocksMarkedForDeletion *prometheus.CounterVec, ) *BlocksCleaner { c := &BlocksCleaner{ - cfg: cfg, - bucketClient: bucketClient, - usersScanner: usersScanner, - cfgProvider: cfgProvider, - logger: log.With(logger, "component", "cleaner"), + cfg: cfg, + bucketClient: bucketClient, + usersScanner: usersScanner, + cfgProvider: cfgProvider, + logger: log.With(logger, "component", "cleaner"), + ringLifecyclerID: "default-cleaner", + cleanerVisitMarkerTimeout: cleanerVisitMarkerTimeout, + cleanerVisitMarkerFileUpdateInterval: cleanerVisitMarkerFileUpdateInterval, runsStarted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_compactor_block_cleanup_started_total", Help: "Total number of blocks cleanup runs started.", @@ -110,6 +122,14 @@ func NewBlocksCleaner( Help: "Total number of blocks failed to be deleted.", }), blocksMarkedForDeletion: blocksMarkedForDeletion, + CleanerVisitMarkerReadFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_cleaner_visit_marker_read_failed", + Help: "Number of cleaner visit marker file failed to be read.", + }), + CleanerVisitMarkerWriteFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_cleaner_visit_marker_write_failed", + Help: "Number of cleaner visit marker file failed to be written.", + }), // The following metrics don't have the "cortex_compactor" prefix because not strictly related to // the compactor. They're just tracked by the compactor because it's the most logical place where these @@ -154,6 +174,10 @@ type cleanerJob struct { timestamp int64 } +func (c *BlocksCleaner) SetRingLifecyclerID(ringLifecyclerID string) { + c.ringLifecyclerID = ringLifecyclerID +} + func (c *BlocksCleaner) starting(ctx context.Context) error { // Run a cleanup so that any other service depending on this service // is guaranteed to start once the initial cleanup has been done. @@ -246,7 +270,15 @@ func (c *BlocksCleaner) cleanUpActiveUsers(ctx context.Context, users []string, return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error { userLogger := util_log.WithUserID(userID, c.logger) userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider) + visitMarkerManager, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket) + if err != nil { + return err + } + if visitMarkerManager == nil { + return nil + } errChan := make(chan error, 1) + go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true) defer func() { errChan <- nil }() @@ -273,7 +305,15 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error { userLogger := util_log.WithUserID(userID, c.logger) userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider) + visitMarkerManager, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket) + if err != nil { + return err + } + if visitMarkerManager == nil { + return nil + } errChan := make(chan error, 1) + go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true) defer func() { errChan <- nil }() @@ -307,6 +347,21 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro return users, deleted, nil } +func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (*VisitMarkerManager, error) { + cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID) + visitMarkerManager := NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker, c.CleanerVisitMarkerReadFailed, c.CleanerVisitMarkerWriteFailed) + + existingCleanerVisitMarker := &CleanerVisitMarker{} + err := visitMarkerManager.ReadVisitMarker(ctx, existingCleanerVisitMarker) + if err != nil && !errors.Is(err, ErrorVisitMarkerNotFound) { + return nil, errors.Wrapf(err, "failed to read cleaner visit marker") + } + if errors.Is(err, ErrorVisitMarkerNotFound) || !existingCleanerVisitMarker.IsVisited(c.cleanerVisitMarkerTimeout) { + return visitMarkerManager, nil + } + return nil, nil +} + // Remove blocks and remaining data for tenant marked for deletion. func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) error { diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index b582fceb7c..16bd97bb24 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -87,7 +87,7 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { Help: blocksMarkedForDeletionHelp, }, append(commonLabels, ReasonLabelName)) - cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, time.Minute, 30*time.Second, blocksMarkedForDeletion) // Clean User with no error cleaner.bucketClient = bkt @@ -194,7 +194,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions Help: blocksMarkedForDeletionHelp, }, append(commonLabels, ReasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, time.Minute, 30*time.Second, blocksMarkedForDeletion) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -355,7 +355,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { Help: blocksMarkedForDeletionHelp, }, append(commonLabels, ReasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, time.Minute, 30*time.Second, blocksMarkedForDeletion) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -419,7 +419,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { Help: blocksMarkedForDeletionHelp, }, append(commonLabels, ReasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, time.Minute, 30*time.Second, blocksMarkedForDeletion) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -477,7 +477,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar Help: blocksMarkedForDeletionHelp, }, append(commonLabels, ReasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, time.Minute, 30*time.Second, blocksMarkedForDeletion) activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) require.NoError(t, err) require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true)) @@ -618,7 +618,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { Help: blocksMarkedForDeletionHelp, }, append(commonLabels, ReasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, time.Minute, 30*time.Second, blocksMarkedForDeletion) assertBlockExists := func(user string, block ulid.ULID, expectExists bool) { exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename)) diff --git a/pkg/compactor/cleaner_visit_marker.go b/pkg/compactor/cleaner_visit_marker.go new file mode 100644 index 0000000000..57ba49402c --- /dev/null +++ b/pkg/compactor/cleaner_visit_marker.go @@ -0,0 +1,98 @@ +package compactor + +import ( + "path" + "time" + + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" +) + +const ( + // CleanerVisitMarkerName is the name of cleaner visit marker file. + CleanerVisitMarkerName = "cleaner-visit-marker.json" + // CleanerVisitMarkerVersion1 is the current supported version of cleaner visit mark file. + CleanerVisitMarkerVersion1 = 1 +) + +type CleanerVisitMarker struct { + CompactorID string `json:"compactorID"` + Status VisitStatus `json:"status"` + // VisitTime is a unix timestamp of when the partition was visited (mark updated). + VisitTime int64 `json:"visitTime"` + // Version of the file. + Version int `json:"version"` +} + +func NewCleanerVisitMarker(compactorID string) *CleanerVisitMarker { + return &CleanerVisitMarker{ + CompactorID: compactorID, + Version: CleanerVisitMarkerVersion1, + } +} + +func (b *CleanerVisitMarker) IsExpired(cleanerVisitMarkerTimeout time.Duration) bool { + return !time.Now().Before(time.Unix(b.VisitTime, 0).Add(cleanerVisitMarkerTimeout)) +} + +func (b *CleanerVisitMarker) IsVisited(cleanerVisitMarkerTimeout time.Duration) bool { + return !b.IsCompleted() && !b.IsFailed() && !b.IsExpired(cleanerVisitMarkerTimeout) +} + +func (b *CleanerVisitMarker) IsCompleted() bool { + return b.Status == Completed +} + +func (b *CleanerVisitMarker) IsFailed() bool { + return b.Status == Failed +} + +func (b *CleanerVisitMarker) IsInProgress() bool { + return b.Status == InProgress +} + +func (b *CleanerVisitMarker) IsPending() bool { + return b.Status == Pending +} + +func (b *CleanerVisitMarker) GetVisitMarkerFilePath() string { + return GetCleanerVisitMarkerFilePath() +} + +func (b *CleanerVisitMarker) MarkInProgress(ownerIdentifier string) { + b.CompactorID = ownerIdentifier + b.Status = InProgress + b.VisitTime = time.Now().Unix() +} + +func (b *CleanerVisitMarker) MarkPending(ownerIdentifier string) { + b.CompactorID = ownerIdentifier + b.Status = Pending + b.VisitTime = time.Now().Unix() +} + +func (b *CleanerVisitMarker) MarkCompleted(ownerIdentifier string) { + b.CompactorID = ownerIdentifier + b.Status = Completed + b.VisitTime = time.Now().Unix() +} + +func (b *CleanerVisitMarker) MarkFailed(ownerIdentifier string) { + b.CompactorID = ownerIdentifier + b.Status = Failed + b.VisitTime = time.Now().Unix() +} + +func (b *CleanerVisitMarker) LogInfo() []string { + return []string{ + "compactor_id", + b.CompactorID, + "status", + string(b.Status), + "visit_time", + time.Unix(b.VisitTime, 0).String(), + } +} + +func GetCleanerVisitMarkerFilePath() string { + return path.Join(bucketindex.MarkersPathname, CleanerVisitMarkerName) +} diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 7eac246c4d..f3ce0df44c 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -216,6 +216,10 @@ type Config struct { BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"` BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"` + // Cleaner visit marker file config + CleanerVisitMarkerTimeout time.Duration `yaml:"cleaner_visit_marker_timeout"` + CleanerVisitMarkerFileUpdateInterval time.Duration `yaml:"cleaner_visit_marker_file_update_interval"` + AcceptMalformedIndex bool `yaml:"accept_malformed_index"` CachingBucketEnabled bool `yaml:"caching_bucket_enabled"` } @@ -255,6 +259,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.BlockVisitMarkerTimeout, "compactor.block-visit-marker-timeout", 5*time.Minute, "How long block visit marker file should be considered as expired and able to be picked up by compactor again.") f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.") + f.DurationVar(&cfg.CleanerVisitMarkerTimeout, "compactor.cleaner-visit-marker-timeout", 10*time.Minute, "How long cleaner visit marker file should be considered as expired and able to be picked up by cleaner again.") + f.DurationVar(&cfg.CleanerVisitMarkerFileUpdateInterval, "compactor.cleaner-visit-marker-file-update-interval", 5*time.Minute, "How frequently cleaner visit marker file should be updated when cleaning user.") + f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.") f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status") } @@ -529,7 +536,8 @@ func (c *Compactor) starting(ctx context.Context) error { CleanupConcurrency: c.compactorCfg.CleanupConcurrency, BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled, TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay, - }, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, c.registerer, c.compactorMetrics.syncerBlocksMarkedForDeletion) + }, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval, + c.compactorMetrics.syncerBlocksMarkedForDeletion) // Initialize the compactors ring if sharding is enabled. if c.compactorCfg.ShardingEnabled { @@ -539,6 +547,8 @@ func (c *Compactor) starting(ctx context.Context) error { return errors.Wrap(err, "unable to initialize compactor ring lifecycler") } + c.blocksCleaner.SetRingLifecyclerID(c.ringLifecycler.ID) + c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ringKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer)) if err != nil { return errors.Wrap(err, "unable to initialize compactor ring") diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 8a77b6b551..b4a1fe8463 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -173,6 +173,9 @@ func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) { bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter(userID+"/", []string{}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) + bucketClient.MockGet(userID+"/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload(userID+"/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete(userID+"/markers/cleaner-visit-marker.json", nil) bucketClient.MockGet(userID+"/bucket-index-sync-status.json", string(content), nil) bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil) @@ -239,7 +242,6 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { # HELP cortex_compactor_blocks_cleaned_total Total number of blocks deleted. # TYPE cortex_compactor_blocks_cleaned_total counter cortex_compactor_blocks_cleaned_total 0 - # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter cortex_compactor_blocks_marked_for_no_compaction_total 0 @@ -332,7 +334,6 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket # HELP cortex_compactor_blocks_cleaned_total Total number of blocks deleted. # TYPE cortex_compactor_blocks_cleaned_total counter cortex_compactor_blocks_cleaned_total 0 - # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter cortex_compactor_blocks_marked_for_no_compaction_total 0 @@ -381,6 +382,9 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D/meta.json", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) + bucketClient.MockGet(userID+"/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload(userID+"/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete(userID+"/markers/cleaner-visit-marker.json", nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(userID), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath(userID), false, nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) @@ -436,6 +440,9 @@ func TestCompactor_ShouldCompactAndRemoveUserFolder(t *testing.T) { bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) @@ -485,7 +492,13 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) bucketClient.MockIter("user-2/markers/", nil, nil) + bucketClient.MockGet("user-2/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-2/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-2/markers/cleaner-visit-marker.json", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) @@ -641,6 +654,10 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { "user-1/markers/01DTW0ZCPDDNV4BV83Q2SV4QAZ-deletion-mark.json", }, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", nil) bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", nil) bucketClient.MockDelete("user-1/markers/01DTW0ZCPDDNV4BV83Q2SV4QAZ-deletion-mark.json", nil) @@ -740,7 +757,13 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) { bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) bucketClient.MockIter("user-2/markers/", nil, nil) + bucketClient.MockGet("user-2/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-2/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-2/markers/cleaner-visit-marker.json", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", mockNoCompactBlockJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) @@ -816,6 +839,10 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) bucketClient.MockGet(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), `{"deletion_time": 1}`, nil) bucketClient.MockUpload(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTVP434PA9VFXSW2JKB3392D/index"}, nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/index", "some index content", nil) @@ -979,7 +1006,13 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) bucketClient.MockIter("user-2/markers/", nil, nil) + bucketClient.MockGet("user-2/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-2/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-2/markers/cleaner-visit-marker.json", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) @@ -1078,6 +1111,9 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM for _, userID := range userIDs { bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) + bucketClient.MockGet(userID+"/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload(userID+"/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete(userID+"/markers/cleaner-visit-marker.json", nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(userID), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath(userID), false, nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) @@ -1212,6 +1248,9 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit bucketClient.MockIter(userID+"/", blockFiles, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) + bucketClient.MockGet(userID+"/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload(userID+"/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete(userID+"/markers/cleaner-visit-marker.json", nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(userID), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath(userID), false, nil) bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) diff --git a/pkg/compactor/visit_marker.go b/pkg/compactor/visit_marker.go new file mode 100644 index 0000000000..8b363998e8 --- /dev/null +++ b/pkg/compactor/visit_marker.go @@ -0,0 +1,220 @@ +package compactor + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/compact" + + "github.com/cortexproject/cortex/pkg/util/runutil" +) + +var ( + ErrorVisitMarkerNotFound = errors.New("visit marker not found") + ErrorUnmarshalVisitMarker = errors.New("unmarshal visit marker JSON") +) + +type VisitStatus string + +const ( + Pending VisitStatus = "pending" + InProgress VisitStatus = "inProgress" + Completed VisitStatus = "completed" + Failed VisitStatus = "failed" +) + +type VisitMarker interface { + GetVisitMarkerFilePath() string + MarkInProgress(ownerIdentifier string) + MarkPending(ownerIdentifier string) + MarkCompleted(ownerIdentifier string) + MarkFailed(ownerIdentifier string) + LogInfo() []string + IsExpired(visitMarkerTimeout time.Duration) bool + IsCompleted() bool + IsFailed() bool + IsInProgress() bool + IsPending() bool +} + +type VisitMarkerManager struct { + bkt objstore.InstrumentedBucket + logger log.Logger + ownerIdentifier string + visitMarker VisitMarker + visitMarkerReadFailed prometheus.Counter + visitMarkerWriteFailed prometheus.Counter + + mutex sync.Mutex +} + +func NewVisitMarkerManager( + bkt objstore.InstrumentedBucket, + logger log.Logger, + ownerIdentifier string, + visitMarker VisitMarker, + visitMarkerReadFailed prometheus.Counter, + visitMarkerWriteFailed prometheus.Counter, +) *VisitMarkerManager { + return &VisitMarkerManager{ + bkt: bkt, + logger: log.With(logger, "type", fmt.Sprintf("%T", visitMarker), visitMarker.LogInfo()), + ownerIdentifier: ownerIdentifier, + visitMarker: visitMarker, + visitMarkerReadFailed: visitMarkerReadFailed, + visitMarkerWriteFailed: visitMarkerWriteFailed, + } +} + +func (v *VisitMarkerManager) HeartBeat(ctx context.Context, errChan <-chan error, visitMarkerFileUpdateInterval time.Duration, deleteOnExit bool) { + level.Info(v.getLogger()).Log("msg", "start visit marker heart beat") + ticker := time.NewTicker(visitMarkerFileUpdateInterval) + defer ticker.Stop() +heartBeat: + for { + v.MarkInProgress(ctx) + + select { + case <-ctx.Done(): + level.Warn(v.getLogger()).Log("msg", "visit marker heart beat got cancelled") + v.MarkPending(context.Background()) + break heartBeat + case <-ticker.C: + continue + case err := <-errChan: + if err == nil { + level.Info(v.getLogger()).Log("msg", "update visit marker to completed status") + v.MarkCompleted(ctx) + } else { + level.Warn(v.getLogger()).Log("msg", "stop visit marker heart beat due to error", "err", err) + if compact.IsHaltError(err) { + level.Info(v.getLogger()).Log("msg", "update visit marker to failed status", "err", err) + v.MarkFailed(ctx) + } else { + level.Info(v.getLogger()).Log("msg", "update visit marker to pending status", "err", err) + v.MarkPending(ctx) + } + } + break heartBeat + } + } + level.Info(v.getLogger()).Log("msg", "stop visit marker heart beat") + if deleteOnExit { + level.Info(v.getLogger()).Log("msg", "delete visit marker when exiting heart beat") + v.DeleteVisitMarker(context.Background()) + } +} + +func (v *VisitMarkerManager) MarkInProgress(ctx context.Context) { + v.visitMarker.MarkInProgress(v.ownerIdentifier) + if err := v.updateVisitMarker(ctx); err != nil { + level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) + return + } + level.Debug(v.getLogger()).Log("msg", "marked in progress") +} + +func (v *VisitMarkerManager) MarkPending(ctx context.Context) { + v.visitMarker.MarkPending(v.ownerIdentifier) + if err := v.updateVisitMarker(ctx); err != nil { + level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) + return + } + level.Debug(v.getLogger()).Log("msg", "marked pending") +} + +func (v *VisitMarkerManager) MarkCompleted(ctx context.Context) { + v.visitMarker.MarkCompleted(v.ownerIdentifier) + if err := v.updateVisitMarker(ctx); err != nil { + level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) + return + } + level.Debug(v.getLogger()).Log("msg", "marked completed") +} + +func (v *VisitMarkerManager) MarkFailed(ctx context.Context) { + v.visitMarker.MarkFailed(v.ownerIdentifier) + if err := v.updateVisitMarker(ctx); err != nil { + level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) + return + } + level.Debug(v.getLogger()).Log("msg", "marked failed") +} + +func (v *VisitMarkerManager) DeleteVisitMarker(ctx context.Context) { + if err := v.bkt.Delete(ctx, v.visitMarker.GetVisitMarkerFilePath()); err != nil { + level.Error(v.getLogger()).Log("msg", "failed to delete visit marker", "err", err) + return + } + level.Debug(v.getLogger()).Log("msg", "visit marker deleted") +} + +func (v *VisitMarkerManager) ReloadVisitMarker(ctx context.Context) error { + if err := v.ReadVisitMarker(ctx, v.visitMarker); err != nil { + return err + } + v.setLogger(log.With(v.getLogger(), v.visitMarker.LogInfo())) + level.Debug(v.getLogger()).Log("msg", "visit marker reloaded") + return nil +} + +func (v *VisitMarkerManager) ReadVisitMarker(ctx context.Context, visitMarker any) error { + visitMarkerFile := v.visitMarker.GetVisitMarkerFilePath() + visitMarkerFileReader, err := v.bkt.ReaderWithExpectedErrs(v.bkt.IsObjNotFoundErr).Get(ctx, visitMarkerFile) + if err != nil { + if v.bkt.IsObjNotFoundErr(err) { + return errors.Wrapf(ErrorVisitMarkerNotFound, "visit marker file: %s", visitMarkerFile) + } + v.visitMarkerReadFailed.Inc() + return errors.Wrapf(err, "get visit marker file: %s", visitMarkerFile) + } + defer runutil.CloseWithLogOnErr(v.getLogger(), visitMarkerFileReader, "close visit marker reader") + b, err := io.ReadAll(visitMarkerFileReader) + if err != nil { + v.visitMarkerReadFailed.Inc() + return errors.Wrapf(err, "read visit marker file: %s", visitMarkerFile) + } + if err = json.Unmarshal(b, visitMarker); err != nil { + v.visitMarkerReadFailed.Inc() + return errors.Wrapf(ErrorUnmarshalVisitMarker, "visit marker file: %s, content: %s, error: %v", visitMarkerFile, string(b), err.Error()) + } + level.Debug(v.getLogger()).Log("msg", "visit marker read from file", "visit_marker_file", visitMarkerFile) + return nil +} + +func (v *VisitMarkerManager) updateVisitMarker(ctx context.Context) error { + visitMarkerFileContent, err := json.Marshal(v.visitMarker) + if err != nil { + v.visitMarkerWriteFailed.Inc() + return err + } + + reader := bytes.NewReader(visitMarkerFileContent) + if err := v.bkt.Upload(ctx, v.visitMarker.GetVisitMarkerFilePath(), reader); err != nil { + v.visitMarkerWriteFailed.Inc() + return err + } + return nil +} + +func (v *VisitMarkerManager) getLogger() log.Logger { + v.mutex.Lock() + defer v.mutex.Unlock() + return v.logger +} + +func (v *VisitMarkerManager) setLogger(logger log.Logger) { + v.mutex.Lock() + defer v.mutex.Unlock() + v.logger = logger +} diff --git a/pkg/compactor/visit_marker_test.go b/pkg/compactor/visit_marker_test.go new file mode 100644 index 0000000000..184ce81163 --- /dev/null +++ b/pkg/compactor/visit_marker_test.go @@ -0,0 +1,294 @@ +package compactor + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/compact" + + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" +) + +func TestMarkPending(t *testing.T) { + ctx := context.Background() + dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + logger := log.NewNopLogger() + + ownerIdentifier := "test-owner" + testVisitMarker := NewTestVisitMarker(ownerIdentifier) + + visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) + visitMarkerManager.MarkPending(ctx) + + require.Equal(t, Pending, testVisitMarker.Status) + + visitMarkerFromFile := &TestVisitMarker{} + err := visitMarkerManager.ReadVisitMarker(ctx, visitMarkerFromFile) + require.NoError(t, err) + require.Equal(t, Pending, visitMarkerFromFile.Status) +} + +func TestMarkInProgress(t *testing.T) { + ctx := context.Background() + dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + logger := log.NewNopLogger() + + ownerIdentifier := "test-owner" + testVisitMarker := NewTestVisitMarker(ownerIdentifier) + + visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) + visitMarkerManager.MarkInProgress(ctx) + + require.Equal(t, InProgress, testVisitMarker.Status) + + visitMarkerFromFile := &TestVisitMarker{} + err := visitMarkerManager.ReadVisitMarker(ctx, visitMarkerFromFile) + require.NoError(t, err) + require.Equal(t, InProgress, visitMarkerFromFile.Status) +} + +func TestMarkCompleted(t *testing.T) { + ctx := context.Background() + dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + logger := log.NewNopLogger() + + ownerIdentifier := "test-owner" + testVisitMarker := NewTestVisitMarker(ownerIdentifier) + + visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) + visitMarkerManager.MarkCompleted(ctx) + + require.Equal(t, Completed, testVisitMarker.Status) + + visitMarkerFromFile := &TestVisitMarker{} + err := visitMarkerManager.ReadVisitMarker(ctx, visitMarkerFromFile) + require.NoError(t, err) + require.Equal(t, Completed, visitMarkerFromFile.Status) +} + +func TestReloadVisitMarker(t *testing.T) { + ctx := context.Background() + dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + logger := log.NewNopLogger() + + ownerIdentifier := "test-owner" + testVisitMarker := NewTestVisitMarker(ownerIdentifier) + + visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) + + newValue := "updated stored value" + updatedVisitMarker := TestVisitMarker{ + OwnerIdentifier: ownerIdentifier, + Status: Completed, + StoredValue: newValue, + } + visitMarkerFileContent, err := json.Marshal(updatedVisitMarker) + require.NoError(t, err) + + reader := bytes.NewReader(visitMarkerFileContent) + err = bkt.Upload(ctx, testVisitMarker.GetVisitMarkerFilePath(), reader) + require.NoError(t, err) + + err = visitMarkerManager.ReloadVisitMarker(ctx) + require.NoError(t, err) + require.Equal(t, ownerIdentifier, testVisitMarker.OwnerIdentifier) + require.Equal(t, Completed, testVisitMarker.Status) + require.Equal(t, newValue, testVisitMarker.StoredValue) +} + +func TestUpdateExistingVisitMarker(t *testing.T) { + ctx := context.Background() + dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + logger := log.NewNopLogger() + + ownerIdentifier1 := "test-owner-1" + testVisitMarker1 := NewTestVisitMarker(ownerIdentifier1) + visitMarkerManager1 := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier1, testVisitMarker1, dummyCounter, dummyCounter) + visitMarkerManager1.MarkInProgress(ctx) + + ownerIdentifier2 := "test-owner-2" + testVisitMarker2 := &TestVisitMarker{ + OwnerIdentifier: ownerIdentifier2, + markerID: testVisitMarker1.markerID, + StoredValue: testVisitMarker1.StoredValue, + } + visitMarkerManager2 := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier2, testVisitMarker2, dummyCounter, dummyCounter) + visitMarkerManager2.MarkCompleted(ctx) + + visitMarkerFromFile := &TestVisitMarker{} + err := visitMarkerManager2.ReadVisitMarker(ctx, visitMarkerFromFile) + require.NoError(t, err) + require.Equal(t, ownerIdentifier2, visitMarkerFromFile.OwnerIdentifier) + require.Equal(t, Completed, visitMarkerFromFile.Status) +} + +func TestHeartBeat(t *testing.T) { + for _, tcase := range []struct { + name string + isCancelled bool + callerErr error + expectedStatus VisitStatus + deleteOnExit bool + }{ + { + name: "heart beat got cancelled", + isCancelled: true, + callerErr: nil, + expectedStatus: Pending, + deleteOnExit: false, + }, + { + name: "heart beat complete without error", + isCancelled: false, + callerErr: nil, + expectedStatus: Completed, + deleteOnExit: false, + }, + { + name: "heart beat stopped due to halt error", + isCancelled: false, + callerErr: compact.HaltError{}, + expectedStatus: Failed, + deleteOnExit: false, + }, + { + name: "heart beat stopped due to non halt error", + isCancelled: false, + callerErr: fmt.Errorf("some error"), + expectedStatus: Pending, + deleteOnExit: false, + }, + { + name: "heart beat got cancelled and delete visit marker on exit", + isCancelled: true, + callerErr: nil, + expectedStatus: Pending, + deleteOnExit: true, + }, + { + name: "heart beat complete without error and delete visit marker on exit", + isCancelled: false, + callerErr: nil, + expectedStatus: Completed, + deleteOnExit: true, + }, + { + name: "heart beat stopped due to caller error and delete visit marker on exit", + isCancelled: false, + callerErr: fmt.Errorf("some error"), + expectedStatus: Failed, + deleteOnExit: true, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + logger := log.NewNopLogger() + errChan := make(chan error, 1) + + ownerIdentifier := "test-owner" + testVisitMarker := NewTestVisitMarker(ownerIdentifier) + visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) + go visitMarkerManager.HeartBeat(ctx, errChan, time.Second, tcase.deleteOnExit) + + time.Sleep(2 * time.Second) + if tcase.isCancelled { + cancel() + } else { + errChan <- tcase.callerErr + defer cancel() + } + time.Sleep(2 * time.Second) + + if tcase.deleteOnExit { + exists, err := bkt.Exists(context.Background(), testVisitMarker.GetVisitMarkerFilePath()) + require.NoError(t, err) + require.False(t, exists) + } else { + err := visitMarkerManager.ReloadVisitMarker(context.Background()) + require.NoError(t, err) + require.Equal(t, tcase.expectedStatus, testVisitMarker.Status) + } + }) + } +} + +type TestVisitMarker struct { + OwnerIdentifier string `json:"ownerIdentifier"` + Status VisitStatus `json:"status"` + StoredValue string `json:"storedValue"` + + markerID ulid.ULID +} + +func (t *TestVisitMarker) IsExpired(visitMarkerTimeout time.Duration) bool { + return true +} + +func (t *TestVisitMarker) IsCompleted() bool { + return t.Status == Completed +} + +func (t *TestVisitMarker) IsFailed() bool { + return t.Status == Failed +} + +func (t *TestVisitMarker) IsPending() bool { + return t.Status == Pending +} + +func (t *TestVisitMarker) IsInProgress() bool { + return t.Status == InProgress +} + +func NewTestVisitMarker(ownerIdentifier string) *TestVisitMarker { + return &TestVisitMarker{ + OwnerIdentifier: ownerIdentifier, + markerID: ulid.MustNew(uint64(time.Now().UnixMilli()), rand.Reader), + StoredValue: "initial value", + } +} + +func (t *TestVisitMarker) GetVisitMarkerFilePath() string { + return fmt.Sprintf("test-visit-marker-%s.json", t.markerID.String()) +} + +func (t *TestVisitMarker) MarkInProgress(ownerIdentifier string) { + t.OwnerIdentifier = ownerIdentifier + t.Status = InProgress +} + +func (t *TestVisitMarker) MarkPending(ownerIdentifier string) { + t.OwnerIdentifier = ownerIdentifier + t.Status = Pending +} + +func (t *TestVisitMarker) MarkCompleted(ownerIdentifier string) { + t.OwnerIdentifier = ownerIdentifier + t.Status = Completed +} + +func (t *TestVisitMarker) MarkFailed(ownerIdentifier string) { + t.OwnerIdentifier = ownerIdentifier + t.Status = Failed +} + +func (t *TestVisitMarker) LogInfo() []string { + return []string{"id", t.markerID.String(), "ownerIdentifier", t.OwnerIdentifier, "status", string(t.Status), "storedValue", t.StoredValue} +} From 2742aeb44f53596d67018897fd01a3e7c6ec2908 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 23 Jul 2024 17:34:31 -0700 Subject: [PATCH 02/17] update CHANGELOG Signed-off-by: Alex Le --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f04808f267..3ed1c3cda7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071 * [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081 * [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104 +* [FEATURE] Compactor: Introduce cleaner visit marker. #6113 * [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987 * [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892 * [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916 From a745e07c614a7f78389f54661af64e152c3d299b Mon Sep 17 00:00:00 2001 From: Alex Le Date: Wed, 24 Jul 2024 09:22:16 -0700 Subject: [PATCH 03/17] update CHANGELOG and fix lint Signed-off-by: Alex Le --- CHANGELOG.md | 2 +- docs/blocks-storage/compactor.md | 10 ++++++++++ docs/configuration/config-file-reference.md | 9 +++++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ed1c3cda7..ffbd65f78a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,6 @@ * [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071 * [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081 * [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104 -* [FEATURE] Compactor: Introduce cleaner visit marker. #6113 * [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987 * [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892 * [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916 @@ -32,6 +31,7 @@ * [ENHANCEMENT] Compactor: Add unique execution ID for each compaction cycle in log for easy debugging. #6097 * [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040 * [ENHANCEMENT] Compactor: Split cleaner cycle for active and deleted tenants. #6112 +* [ENHANCEMENT] Compactor: Introduce cleaner visit marker. #6113 * [BUGFIX] Configsdb: Fix endline issue in db password. #5920 * [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952 * [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018 diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 5fb44208ec..e42364c085 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -295,6 +295,16 @@ compactor: # CLI flag: -compactor.block-visit-marker-file-update-interval [block_visit_marker_file_update_interval: | default = 1m] + # How long cleaner visit marker file should be considered as expired and able + # to be picked up by cleaner again. + # CLI flag: -compactor.cleaner-visit-marker-timeout + [cleaner_visit_marker_timeout: | default = 10m] + + # How frequently cleaner visit marker file should be updated when cleaning + # user. + # CLI flag: -compactor.cleaner-visit-marker-file-update-interval + [cleaner_visit_marker_file_update_interval: | default = 5m] + # When enabled, index verification will ignore out of order label names. # CLI flag: -compactor.accept-malformed-index [accept_malformed_index: | default = false] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 1e730d99e1..b4315532f5 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2214,6 +2214,15 @@ sharding_ring: # CLI flag: -compactor.block-visit-marker-file-update-interval [block_visit_marker_file_update_interval: | default = 1m] +# How long cleaner visit marker file should be considered as expired and able to +# be picked up by cleaner again. +# CLI flag: -compactor.cleaner-visit-marker-timeout +[cleaner_visit_marker_timeout: | default = 10m] + +# How frequently cleaner visit marker file should be updated when cleaning user. +# CLI flag: -compactor.cleaner-visit-marker-file-update-interval +[cleaner_visit_marker_file_update_interval: | default = 5m] + # When enabled, index verification will ignore out of order label names. # CLI flag: -compactor.accept-malformed-index [accept_malformed_index: | default = false] From 6590605b964f5c5f8d343fc3bbd1717ac6d17b76 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Wed, 24 Jul 2024 12:15:32 -0700 Subject: [PATCH 04/17] removed unused function Signed-off-by: Alex Le --- pkg/compactor/visit_marker.go | 9 -------- pkg/compactor/visit_marker_test.go | 37 ------------------------------ 2 files changed, 46 deletions(-) diff --git a/pkg/compactor/visit_marker.go b/pkg/compactor/visit_marker.go index 8b363998e8..28f8459df9 100644 --- a/pkg/compactor/visit_marker.go +++ b/pkg/compactor/visit_marker.go @@ -159,15 +159,6 @@ func (v *VisitMarkerManager) DeleteVisitMarker(ctx context.Context) { level.Debug(v.getLogger()).Log("msg", "visit marker deleted") } -func (v *VisitMarkerManager) ReloadVisitMarker(ctx context.Context) error { - if err := v.ReadVisitMarker(ctx, v.visitMarker); err != nil { - return err - } - v.setLogger(log.With(v.getLogger(), v.visitMarker.LogInfo())) - level.Debug(v.getLogger()).Log("msg", "visit marker reloaded") - return nil -} - func (v *VisitMarkerManager) ReadVisitMarker(ctx context.Context, visitMarker any) error { visitMarkerFile := v.visitMarker.GetVisitMarkerFilePath() visitMarkerFileReader, err := v.bkt.ReaderWithExpectedErrs(v.bkt.IsObjNotFoundErr).Get(ctx, visitMarkerFile) diff --git a/pkg/compactor/visit_marker_test.go b/pkg/compactor/visit_marker_test.go index 184ce81163..439fdb7010 100644 --- a/pkg/compactor/visit_marker_test.go +++ b/pkg/compactor/visit_marker_test.go @@ -1,10 +1,8 @@ package compactor import ( - "bytes" "context" "crypto/rand" - "encoding/json" "fmt" "testing" "time" @@ -79,37 +77,6 @@ func TestMarkCompleted(t *testing.T) { require.Equal(t, Completed, visitMarkerFromFile.Status) } -func TestReloadVisitMarker(t *testing.T) { - ctx := context.Background() - dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) - bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) - logger := log.NewNopLogger() - - ownerIdentifier := "test-owner" - testVisitMarker := NewTestVisitMarker(ownerIdentifier) - - visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) - - newValue := "updated stored value" - updatedVisitMarker := TestVisitMarker{ - OwnerIdentifier: ownerIdentifier, - Status: Completed, - StoredValue: newValue, - } - visitMarkerFileContent, err := json.Marshal(updatedVisitMarker) - require.NoError(t, err) - - reader := bytes.NewReader(visitMarkerFileContent) - err = bkt.Upload(ctx, testVisitMarker.GetVisitMarkerFilePath(), reader) - require.NoError(t, err) - - err = visitMarkerManager.ReloadVisitMarker(ctx) - require.NoError(t, err) - require.Equal(t, ownerIdentifier, testVisitMarker.OwnerIdentifier) - require.Equal(t, Completed, testVisitMarker.Status) - require.Equal(t, newValue, testVisitMarker.StoredValue) -} - func TestUpdateExistingVisitMarker(t *testing.T) { ctx := context.Background() dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) @@ -220,10 +187,6 @@ func TestHeartBeat(t *testing.T) { exists, err := bkt.Exists(context.Background(), testVisitMarker.GetVisitMarkerFilePath()) require.NoError(t, err) require.False(t, exists) - } else { - err := visitMarkerManager.ReloadVisitMarker(context.Background()) - require.NoError(t, err) - require.Equal(t, tcase.expectedStatus, testVisitMarker.Status) } }) } From cc55af09842204fb60607ba825888e8c0323219e Mon Sep 17 00:00:00 2001 From: Alex Le Date: Wed, 24 Jul 2024 12:22:33 -0700 Subject: [PATCH 05/17] fix lint Signed-off-by: Alex Le --- pkg/compactor/visit_marker.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/compactor/visit_marker.go b/pkg/compactor/visit_marker.go index 28f8459df9..e46ea19d87 100644 --- a/pkg/compactor/visit_marker.go +++ b/pkg/compactor/visit_marker.go @@ -203,9 +203,3 @@ func (v *VisitMarkerManager) getLogger() log.Logger { defer v.mutex.Unlock() return v.logger } - -func (v *VisitMarkerManager) setLogger(logger log.Logger) { - v.mutex.Lock() - defer v.mutex.Unlock() - v.logger = logger -} From 509427ee59b10bbd3a558d970e8fce898f26293c Mon Sep 17 00:00:00 2001 From: Alex Le Date: Wed, 24 Jul 2024 18:10:50 -0700 Subject: [PATCH 06/17] fixed test Signed-off-by: Alex Le --- pkg/compactor/blocks_cleaner_test.go | 24 ++++++++++++++++++++++++ pkg/compactor/compactor_test.go | 6 ++++++ 2 files changed, 30 insertions(+) diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 929568b031..c68d0a1422 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -628,6 +628,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { // Existing behaviour - retention period disabled. { + // clean up cleaner visit marker before running test + bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + cfgProvider.userRetentionPeriods["user-1"] = 0 cfgProvider.userRetentionPeriods["user-2"] = 0 @@ -662,6 +666,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { // Retention enabled only for a single user, but does nothing. { + // clean up cleaner visit marker before running test + bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + cfgProvider.userRetentionPeriods["user-1"] = 9 * time.Hour activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) @@ -677,6 +685,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { // Retention enabled only for a single user, marking a single block. // Note the block won't be deleted yet due to deletion delay. { + // clean up cleaner visit marker before running test + bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + cfgProvider.userRetentionPeriods["user-1"] = 7 * time.Hour activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) @@ -710,6 +722,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { // Marking the block again, before the deletion occurs, should not cause an error. { + // clean up cleaner visit marker before running test + bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) require.NoError(t, err) require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, false)) @@ -722,6 +738,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { // Reduce the deletion delay. Now the block will be deleted. { + // clean up cleaner visit marker before running test + bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + cleaner.cfg.DeletionDelay = 0 activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) @@ -755,6 +775,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { // Retention enabled for other user; test deleting multiple blocks. { + // clean up cleaner visit marker before running test + bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + cfgProvider.userRetentionPeriods["user-2"] = 5 * time.Hour activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 7cbe71f607..908f962cf2 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -2044,6 +2044,9 @@ func TestCompactor_FailedWithRetriableError(t *testing.T) { bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", nil, errors.New("test retriable error")) @@ -2095,6 +2098,9 @@ func TestCompactor_FailedWithHaltError(t *testing.T) { bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", nil, compact.HaltError{}) From 10762a671f2126c12ca595264eeee932117165db Mon Sep 17 00:00:00 2001 From: Alex Le Date: Mon, 29 Jul 2024 14:38:27 -0700 Subject: [PATCH 07/17] refactor Signed-off-by: Alex Le --- pkg/compactor/blocks_cleaner.go | 17 ++++++------ pkg/compactor/blocks_cleaner_test.go | 12 ++++---- pkg/compactor/cleaner_visit_marker.go | 40 ++++----------------------- pkg/compactor/compactor.go | 23 +++++++-------- pkg/compactor/visit_marker.go | 26 +++++++---------- pkg/compactor/visit_marker_test.go | 35 +++-------------------- 6 files changed, 46 insertions(+), 107 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index f5d17a0a00..50b5994547 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -66,8 +66,8 @@ type BlocksCleaner struct { blocksCleanedTotal prometheus.Counter blocksFailedTotal prometheus.Counter blocksMarkedForDeletion *prometheus.CounterVec - CleanerVisitMarkerReadFailed prometheus.Counter - CleanerVisitMarkerWriteFailed prometheus.Counter + cleanerVisitMarkerReadFailed prometheus.Counter + cleanerVisitMarkerWriteFailed prometheus.Counter tenantBlocks *prometheus.GaugeVec tenantBlocksMarkedForDelete *prometheus.GaugeVec tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec @@ -83,6 +83,7 @@ func NewBlocksCleaner( usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, + ringLifecyclerID string, reg prometheus.Registerer, cleanerVisitMarkerTimeout time.Duration, cleanerVisitMarkerFileUpdateInterval time.Duration, @@ -94,7 +95,7 @@ func NewBlocksCleaner( usersScanner: usersScanner, cfgProvider: cfgProvider, logger: log.With(logger, "component", "cleaner"), - ringLifecyclerID: "default-cleaner", + ringLifecyclerID: ringLifecyclerID, cleanerVisitMarkerTimeout: cleanerVisitMarkerTimeout, cleanerVisitMarkerFileUpdateInterval: cleanerVisitMarkerFileUpdateInterval, runsStarted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ @@ -122,11 +123,11 @@ func NewBlocksCleaner( Help: "Total number of blocks failed to be deleted.", }), blocksMarkedForDeletion: blocksMarkedForDeletion, - CleanerVisitMarkerReadFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + cleanerVisitMarkerReadFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_cleaner_visit_marker_read_failed", Help: "Number of cleaner visit marker file failed to be read.", }), - CleanerVisitMarkerWriteFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + cleanerVisitMarkerWriteFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_cleaner_visit_marker_write_failed", Help: "Number of cleaner visit marker file failed to be written.", }), @@ -349,14 +350,14 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (*VisitMarkerManager, error) { cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID) - visitMarkerManager := NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker, c.CleanerVisitMarkerReadFailed, c.CleanerVisitMarkerWriteFailed) + visitMarkerManager := NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker, c.cleanerVisitMarkerReadFailed, c.cleanerVisitMarkerWriteFailed) existingCleanerVisitMarker := &CleanerVisitMarker{} err := visitMarkerManager.ReadVisitMarker(ctx, existingCleanerVisitMarker) - if err != nil && !errors.Is(err, ErrorVisitMarkerNotFound) { + if err != nil && !errors.Is(err, errorVisitMarkerNotFound) { return nil, errors.Wrapf(err, "failed to read cleaner visit marker") } - if errors.Is(err, ErrorVisitMarkerNotFound) || !existingCleanerVisitMarker.IsVisited(c.cleanerVisitMarkerTimeout) { + if errors.Is(err, errorVisitMarkerNotFound) || !existingCleanerVisitMarker.IsVisited(c.cleanerVisitMarkerTimeout) { return visitMarkerManager, nil } return nil, nil diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index c68d0a1422..d3c7aa6da9 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -87,7 +87,7 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) - cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, time.Minute, 30*time.Second, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion) // Clean User with no error cleaner.bucketClient = bkt @@ -194,7 +194,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, time.Minute, 30*time.Second, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -355,7 +355,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, time.Minute, 30*time.Second, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -419,7 +419,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, time.Minute, 30*time.Second, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -477,7 +477,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, time.Minute, 30*time.Second, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion) activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) require.NoError(t, err) require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true)) @@ -618,7 +618,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, time.Minute, 30*time.Second, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion) assertBlockExists := func(user string, block ulid.ULID, expectExists bool) { exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename)) diff --git a/pkg/compactor/cleaner_visit_marker.go b/pkg/compactor/cleaner_visit_marker.go index 57ba49402c..ec071a2e3e 100644 --- a/pkg/compactor/cleaner_visit_marker.go +++ b/pkg/compactor/cleaner_visit_marker.go @@ -35,50 +35,20 @@ func (b *CleanerVisitMarker) IsExpired(cleanerVisitMarkerTimeout time.Duration) } func (b *CleanerVisitMarker) IsVisited(cleanerVisitMarkerTimeout time.Duration) bool { - return !b.IsCompleted() && !b.IsFailed() && !b.IsExpired(cleanerVisitMarkerTimeout) + return !(b.GetStatus() == Completed) && !(b.GetStatus() == Failed) && !b.IsExpired(cleanerVisitMarkerTimeout) } -func (b *CleanerVisitMarker) IsCompleted() bool { - return b.Status == Completed -} - -func (b *CleanerVisitMarker) IsFailed() bool { - return b.Status == Failed -} - -func (b *CleanerVisitMarker) IsInProgress() bool { - return b.Status == InProgress -} - -func (b *CleanerVisitMarker) IsPending() bool { - return b.Status == Pending +func (b *CleanerVisitMarker) GetStatus() VisitStatus { + return b.Status } func (b *CleanerVisitMarker) GetVisitMarkerFilePath() string { return GetCleanerVisitMarkerFilePath() } -func (b *CleanerVisitMarker) MarkInProgress(ownerIdentifier string) { - b.CompactorID = ownerIdentifier - b.Status = InProgress - b.VisitTime = time.Now().Unix() -} - -func (b *CleanerVisitMarker) MarkPending(ownerIdentifier string) { - b.CompactorID = ownerIdentifier - b.Status = Pending - b.VisitTime = time.Now().Unix() -} - -func (b *CleanerVisitMarker) MarkCompleted(ownerIdentifier string) { - b.CompactorID = ownerIdentifier - b.Status = Completed - b.VisitTime = time.Now().Unix() -} - -func (b *CleanerVisitMarker) MarkFailed(ownerIdentifier string) { +func (b *CleanerVisitMarker) UpdateStatus(ownerIdentifier string, status VisitStatus) { b.CompactorID = ownerIdentifier - b.Status = Failed + b.Status = status b.VisitTime = time.Now().Unix() } diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 57428247ce..6ca567a4e8 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -529,16 +529,7 @@ func (c *Compactor) starting(ctx context.Context) error { // Create the users scanner. c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUserForCleanUp, c.parentLogger) - // Create the blocks cleaner (service). - c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{ - DeletionDelay: c.compactorCfg.DeletionDelay, - CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1), - CleanupConcurrency: c.compactorCfg.CleanupConcurrency, - BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled, - TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay, - }, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval, - c.compactorMetrics.syncerBlocksMarkedForDeletion) - + var cleanerRingLifecyclerID = "default-cleaner" // Initialize the compactors ring if sharding is enabled. if c.compactorCfg.ShardingEnabled { lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig() @@ -547,7 +538,7 @@ func (c *Compactor) starting(ctx context.Context) error { return errors.Wrap(err, "unable to initialize compactor ring lifecycler") } - c.blocksCleaner.SetRingLifecyclerID(c.ringLifecycler.ID) + cleanerRingLifecyclerID = c.ringLifecycler.ID c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ringKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer)) if err != nil { @@ -598,6 +589,16 @@ func (c *Compactor) starting(ctx context.Context) error { } } + // Create the blocks cleaner (service). + c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{ + DeletionDelay: c.compactorCfg.DeletionDelay, + CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1), + CleanupConcurrency: c.compactorCfg.CleanupConcurrency, + BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled, + TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay, + }, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval, + c.compactorMetrics.syncerBlocksMarkedForDeletion) + // Ensure an initial cleanup occurred before starting the compactor. if err := services.StartAndAwaitRunning(ctx, c.blocksCleaner); err != nil { c.ringSubservices.StopAsync() diff --git a/pkg/compactor/visit_marker.go b/pkg/compactor/visit_marker.go index e46ea19d87..dba57d7d25 100644 --- a/pkg/compactor/visit_marker.go +++ b/pkg/compactor/visit_marker.go @@ -20,8 +20,8 @@ import ( ) var ( - ErrorVisitMarkerNotFound = errors.New("visit marker not found") - ErrorUnmarshalVisitMarker = errors.New("unmarshal visit marker JSON") + errorVisitMarkerNotFound = errors.New("visit marker not found") + errorUnmarshalVisitMarker = errors.New("unmarshal visit marker JSON") ) type VisitStatus string @@ -35,16 +35,10 @@ const ( type VisitMarker interface { GetVisitMarkerFilePath() string - MarkInProgress(ownerIdentifier string) - MarkPending(ownerIdentifier string) - MarkCompleted(ownerIdentifier string) - MarkFailed(ownerIdentifier string) + UpdateStatus(ownerIdentifier string, status VisitStatus) + GetStatus() VisitStatus LogInfo() []string IsExpired(visitMarkerTimeout time.Duration) bool - IsCompleted() bool - IsFailed() bool - IsInProgress() bool - IsPending() bool } type VisitMarkerManager struct { @@ -116,7 +110,7 @@ heartBeat: } func (v *VisitMarkerManager) MarkInProgress(ctx context.Context) { - v.visitMarker.MarkInProgress(v.ownerIdentifier) + v.visitMarker.UpdateStatus(v.ownerIdentifier, InProgress) if err := v.updateVisitMarker(ctx); err != nil { level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) return @@ -125,7 +119,7 @@ func (v *VisitMarkerManager) MarkInProgress(ctx context.Context) { } func (v *VisitMarkerManager) MarkPending(ctx context.Context) { - v.visitMarker.MarkPending(v.ownerIdentifier) + v.visitMarker.UpdateStatus(v.ownerIdentifier, Pending) if err := v.updateVisitMarker(ctx); err != nil { level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) return @@ -134,7 +128,7 @@ func (v *VisitMarkerManager) MarkPending(ctx context.Context) { } func (v *VisitMarkerManager) MarkCompleted(ctx context.Context) { - v.visitMarker.MarkCompleted(v.ownerIdentifier) + v.visitMarker.UpdateStatus(v.ownerIdentifier, Completed) if err := v.updateVisitMarker(ctx); err != nil { level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) return @@ -143,7 +137,7 @@ func (v *VisitMarkerManager) MarkCompleted(ctx context.Context) { } func (v *VisitMarkerManager) MarkFailed(ctx context.Context) { - v.visitMarker.MarkFailed(v.ownerIdentifier) + v.visitMarker.UpdateStatus(v.ownerIdentifier, Failed) if err := v.updateVisitMarker(ctx); err != nil { level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) return @@ -164,7 +158,7 @@ func (v *VisitMarkerManager) ReadVisitMarker(ctx context.Context, visitMarker an visitMarkerFileReader, err := v.bkt.ReaderWithExpectedErrs(v.bkt.IsObjNotFoundErr).Get(ctx, visitMarkerFile) if err != nil { if v.bkt.IsObjNotFoundErr(err) { - return errors.Wrapf(ErrorVisitMarkerNotFound, "visit marker file: %s", visitMarkerFile) + return errors.Wrapf(errorVisitMarkerNotFound, "visit marker file: %s", visitMarkerFile) } v.visitMarkerReadFailed.Inc() return errors.Wrapf(err, "get visit marker file: %s", visitMarkerFile) @@ -177,7 +171,7 @@ func (v *VisitMarkerManager) ReadVisitMarker(ctx context.Context, visitMarker an } if err = json.Unmarshal(b, visitMarker); err != nil { v.visitMarkerReadFailed.Inc() - return errors.Wrapf(ErrorUnmarshalVisitMarker, "visit marker file: %s, content: %s, error: %v", visitMarkerFile, string(b), err.Error()) + return errors.Wrapf(errorUnmarshalVisitMarker, "visit marker file: %s, content: %s, error: %v", visitMarkerFile, string(b), err.Error()) } level.Debug(v.getLogger()).Log("msg", "visit marker read from file", "visit_marker_file", visitMarkerFile) return nil diff --git a/pkg/compactor/visit_marker_test.go b/pkg/compactor/visit_marker_test.go index 439fdb7010..9afbb0a64f 100644 --- a/pkg/compactor/visit_marker_test.go +++ b/pkg/compactor/visit_marker_test.go @@ -204,20 +204,8 @@ func (t *TestVisitMarker) IsExpired(visitMarkerTimeout time.Duration) bool { return true } -func (t *TestVisitMarker) IsCompleted() bool { - return t.Status == Completed -} - -func (t *TestVisitMarker) IsFailed() bool { - return t.Status == Failed -} - -func (t *TestVisitMarker) IsPending() bool { - return t.Status == Pending -} - -func (t *TestVisitMarker) IsInProgress() bool { - return t.Status == InProgress +func (t *TestVisitMarker) GetStatus() VisitStatus { + return t.Status } func NewTestVisitMarker(ownerIdentifier string) *TestVisitMarker { @@ -232,24 +220,9 @@ func (t *TestVisitMarker) GetVisitMarkerFilePath() string { return fmt.Sprintf("test-visit-marker-%s.json", t.markerID.String()) } -func (t *TestVisitMarker) MarkInProgress(ownerIdentifier string) { - t.OwnerIdentifier = ownerIdentifier - t.Status = InProgress -} - -func (t *TestVisitMarker) MarkPending(ownerIdentifier string) { - t.OwnerIdentifier = ownerIdentifier - t.Status = Pending -} - -func (t *TestVisitMarker) MarkCompleted(ownerIdentifier string) { - t.OwnerIdentifier = ownerIdentifier - t.Status = Completed -} - -func (t *TestVisitMarker) MarkFailed(ownerIdentifier string) { +func (t *TestVisitMarker) UpdateStatus(ownerIdentifier string, status VisitStatus) { t.OwnerIdentifier = ownerIdentifier - t.Status = Failed + t.Status = status } func (t *TestVisitMarker) LogInfo() []string { From 3d202f1244964a9963bd322069e0b36f29363590 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Mon, 29 Jul 2024 17:14:05 -0700 Subject: [PATCH 08/17] refactor Signed-off-by: Alex Le --- pkg/compactor/blocks_cleaner.go | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 50b5994547..e85b393e81 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -128,8 +128,8 @@ func NewBlocksCleaner( Help: "Number of cleaner visit marker file failed to be read.", }), cleanerVisitMarkerWriteFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_compactor_cleaner_visit_marker_write_failed", - Help: "Number of cleaner visit marker file failed to be written.", + Name: "cortex_compactor_cleaner_visit_marker_write_failed_total", + Help: "Total number of cleaner visit marker file failed to be written.", }), // The following metrics don't have the "cortex_compactor" prefix because not strictly related to @@ -271,11 +271,11 @@ func (c *BlocksCleaner) cleanUpActiveUsers(ctx context.Context, users []string, return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error { userLogger := util_log.WithUserID(userID, c.logger) userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider) - visitMarkerManager, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket) + visitMarkerManager, isVisited, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket) if err != nil { return err } - if visitMarkerManager == nil { + if isVisited { return nil } errChan := make(chan error, 1) @@ -306,11 +306,11 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error { userLogger := util_log.WithUserID(userID, c.logger) userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider) - visitMarkerManager, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket) + visitMarkerManager, isVisited, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket) if err != nil { return err } - if visitMarkerManager == nil { + if isVisited { return nil } errChan := make(chan error, 1) @@ -348,19 +348,17 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro return users, deleted, nil } -func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (*VisitMarkerManager, error) { +func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (visitMarkerManager *VisitMarkerManager, isVisited bool, err error) { cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID) - visitMarkerManager := NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker, c.cleanerVisitMarkerReadFailed, c.cleanerVisitMarkerWriteFailed) + visitMarkerManager = NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker, c.cleanerVisitMarkerReadFailed, c.cleanerVisitMarkerWriteFailed) existingCleanerVisitMarker := &CleanerVisitMarker{} - err := visitMarkerManager.ReadVisitMarker(ctx, existingCleanerVisitMarker) + err = visitMarkerManager.ReadVisitMarker(ctx, existingCleanerVisitMarker) if err != nil && !errors.Is(err, errorVisitMarkerNotFound) { - return nil, errors.Wrapf(err, "failed to read cleaner visit marker") + return nil, false, errors.Wrapf(err, "failed to read cleaner visit marker") } - if errors.Is(err, errorVisitMarkerNotFound) || !existingCleanerVisitMarker.IsVisited(c.cleanerVisitMarkerTimeout) { - return visitMarkerManager, nil - } - return nil, nil + isVisited = !errors.Is(err, errorVisitMarkerNotFound) && existingCleanerVisitMarker.IsVisited(c.cleanerVisitMarkerTimeout) + return visitMarkerManager, isVisited, nil } // Remove blocks and remaining data for tenant marked for deletion. From 76d08f8ddb7a7c3144240e288efea1b0d91f035e Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 30 Jul 2024 16:12:44 -0700 Subject: [PATCH 09/17] refactor Signed-off-by: Alex Le --- pkg/compactor/cleaner_visit_marker.go | 12 ++++---- pkg/compactor/visit_marker.go | 19 +++++++------ pkg/compactor/visit_marker_test.go | 41 +++++++++++++++++++++++++-- 3 files changed, 55 insertions(+), 17 deletions(-) diff --git a/pkg/compactor/cleaner_visit_marker.go b/pkg/compactor/cleaner_visit_marker.go index ec071a2e3e..b31e881066 100644 --- a/pkg/compactor/cleaner_visit_marker.go +++ b/pkg/compactor/cleaner_visit_marker.go @@ -1,6 +1,7 @@ package compactor import ( + "fmt" "path" "time" @@ -52,15 +53,12 @@ func (b *CleanerVisitMarker) UpdateStatus(ownerIdentifier string, status VisitSt b.VisitTime = time.Now().Unix() } -func (b *CleanerVisitMarker) LogInfo() []string { - return []string{ - "compactor_id", +func (b *CleanerVisitMarker) String() string { + return fmt.Sprintf("compactor_id=%s status=%s visit_time=%s", b.CompactorID, - "status", - string(b.Status), - "visit_time", + b.Status, time.Unix(b.VisitTime, 0).String(), - } + ) } func GetCleanerVisitMarkerFilePath() string { diff --git a/pkg/compactor/visit_marker.go b/pkg/compactor/visit_marker.go index dba57d7d25..5e1d939123 100644 --- a/pkg/compactor/visit_marker.go +++ b/pkg/compactor/visit_marker.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "sync" "time" "github.com/go-kit/log" @@ -37,8 +36,8 @@ type VisitMarker interface { GetVisitMarkerFilePath() string UpdateStatus(ownerIdentifier string, status VisitStatus) GetStatus() VisitStatus - LogInfo() []string IsExpired(visitMarkerTimeout time.Duration) bool + String() string } type VisitMarkerManager struct { @@ -48,8 +47,6 @@ type VisitMarkerManager struct { visitMarker VisitMarker visitMarkerReadFailed prometheus.Counter visitMarkerWriteFailed prometheus.Counter - - mutex sync.Mutex } func NewVisitMarkerManager( @@ -62,7 +59,7 @@ func NewVisitMarkerManager( ) *VisitMarkerManager { return &VisitMarkerManager{ bkt: bkt, - logger: log.With(logger, "type", fmt.Sprintf("%T", visitMarker), visitMarker.LogInfo()), + logger: log.With(logger, "type", fmt.Sprintf("%T", visitMarker)), ownerIdentifier: ownerIdentifier, visitMarker: visitMarker, visitMarkerReadFailed: visitMarkerReadFailed, @@ -153,6 +150,14 @@ func (v *VisitMarkerManager) DeleteVisitMarker(ctx context.Context) { level.Debug(v.getLogger()).Log("msg", "visit marker deleted") } +func (v *VisitMarkerManager) ReloadVisitMarker(ctx context.Context) error { + if err := v.ReadVisitMarker(ctx, v.visitMarker); err != nil { + return err + } + level.Debug(v.getLogger()).Log("msg", "visit marker reloaded") + return nil +} + func (v *VisitMarkerManager) ReadVisitMarker(ctx context.Context, visitMarker any) error { visitMarkerFile := v.visitMarker.GetVisitMarkerFilePath() visitMarkerFileReader, err := v.bkt.ReaderWithExpectedErrs(v.bkt.IsObjNotFoundErr).Get(ctx, visitMarkerFile) @@ -193,7 +198,5 @@ func (v *VisitMarkerManager) updateVisitMarker(ctx context.Context) error { } func (v *VisitMarkerManager) getLogger() log.Logger { - v.mutex.Lock() - defer v.mutex.Unlock() - return v.logger + return log.With(v.logger, "visit_marker", v.visitMarker.String()) } diff --git a/pkg/compactor/visit_marker_test.go b/pkg/compactor/visit_marker_test.go index 9afbb0a64f..2b12f7926d 100644 --- a/pkg/compactor/visit_marker_test.go +++ b/pkg/compactor/visit_marker_test.go @@ -1,8 +1,10 @@ package compactor import ( + "bytes" "context" "crypto/rand" + "encoding/json" "fmt" "testing" "time" @@ -77,6 +79,37 @@ func TestMarkCompleted(t *testing.T) { require.Equal(t, Completed, visitMarkerFromFile.Status) } +func TestReloadVisitMarker(t *testing.T) { + ctx := context.Background() + dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + logger := log.NewNopLogger() + + ownerIdentifier := "test-owner" + testVisitMarker := NewTestVisitMarker(ownerIdentifier) + + visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) + + newValue := "updated stored value" + updatedVisitMarker := TestVisitMarker{ + OwnerIdentifier: ownerIdentifier, + Status: Completed, + StoredValue: newValue, + } + visitMarkerFileContent, err := json.Marshal(updatedVisitMarker) + require.NoError(t, err) + + reader := bytes.NewReader(visitMarkerFileContent) + err = bkt.Upload(ctx, testVisitMarker.GetVisitMarkerFilePath(), reader) + require.NoError(t, err) + + err = visitMarkerManager.ReloadVisitMarker(ctx) + require.NoError(t, err) + require.Equal(t, ownerIdentifier, testVisitMarker.OwnerIdentifier) + require.Equal(t, Completed, testVisitMarker.Status) + require.Equal(t, newValue, testVisitMarker.StoredValue) +} + func TestUpdateExistingVisitMarker(t *testing.T) { ctx := context.Background() dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) @@ -187,6 +220,10 @@ func TestHeartBeat(t *testing.T) { exists, err := bkt.Exists(context.Background(), testVisitMarker.GetVisitMarkerFilePath()) require.NoError(t, err) require.False(t, exists) + } else { + err := visitMarkerManager.ReloadVisitMarker(context.Background()) + require.NoError(t, err) + require.Equal(t, tcase.expectedStatus, testVisitMarker.Status) } }) } @@ -225,6 +262,6 @@ func (t *TestVisitMarker) UpdateStatus(ownerIdentifier string, status VisitStatu t.Status = status } -func (t *TestVisitMarker) LogInfo() []string { - return []string{"id", t.markerID.String(), "ownerIdentifier", t.OwnerIdentifier, "status", string(t.Status), "storedValue", t.StoredValue} +func (t *TestVisitMarker) String() string { + return fmt.Sprintf("id=%s ownerIdentifier=%s status=%s storedValue=%s", t.markerID.String(), t.OwnerIdentifier, t.Status, t.StoredValue) } From 249da6819a925e73084a4a15f563163009228109 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 30 Jul 2024 16:16:29 -0700 Subject: [PATCH 10/17] rename metric Signed-off-by: Alex Le --- pkg/compactor/blocks_cleaner.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index e85b393e81..3ee5599438 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -124,8 +124,8 @@ func NewBlocksCleaner( }), blocksMarkedForDeletion: blocksMarkedForDeletion, cleanerVisitMarkerReadFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_compactor_cleaner_visit_marker_read_failed", - Help: "Number of cleaner visit marker file failed to be read.", + Name: "cortex_compactor_cleaner_visit_marker_read_failed_total", + Help: "Total number of cleaner visit marker file failed to be read.", }), cleanerVisitMarkerWriteFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_cleaner_visit_marker_write_failed_total", From e3c0c6e944bb9ebf6a6d2b1c2a8ac268d097cab4 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 30 Jul 2024 17:27:47 -0700 Subject: [PATCH 11/17] use mutex to lock all write operations Signed-off-by: Alex Le --- pkg/compactor/visit_marker.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pkg/compactor/visit_marker.go b/pkg/compactor/visit_marker.go index 5e1d939123..b271266f6f 100644 --- a/pkg/compactor/visit_marker.go +++ b/pkg/compactor/visit_marker.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io" + "sync" "time" "github.com/go-kit/log" @@ -47,6 +48,8 @@ type VisitMarkerManager struct { visitMarker VisitMarker visitMarkerReadFailed prometheus.Counter visitMarkerWriteFailed prometheus.Counter + + mutex sync.Mutex } func NewVisitMarkerManager( @@ -107,6 +110,8 @@ heartBeat: } func (v *VisitMarkerManager) MarkInProgress(ctx context.Context) { + v.mutex.Lock() + defer v.mutex.Unlock() v.visitMarker.UpdateStatus(v.ownerIdentifier, InProgress) if err := v.updateVisitMarker(ctx); err != nil { level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) @@ -116,6 +121,8 @@ func (v *VisitMarkerManager) MarkInProgress(ctx context.Context) { } func (v *VisitMarkerManager) MarkPending(ctx context.Context) { + v.mutex.Lock() + defer v.mutex.Unlock() v.visitMarker.UpdateStatus(v.ownerIdentifier, Pending) if err := v.updateVisitMarker(ctx); err != nil { level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) @@ -125,6 +132,8 @@ func (v *VisitMarkerManager) MarkPending(ctx context.Context) { } func (v *VisitMarkerManager) MarkCompleted(ctx context.Context) { + v.mutex.Lock() + defer v.mutex.Unlock() v.visitMarker.UpdateStatus(v.ownerIdentifier, Completed) if err := v.updateVisitMarker(ctx); err != nil { level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) @@ -134,6 +143,8 @@ func (v *VisitMarkerManager) MarkCompleted(ctx context.Context) { } func (v *VisitMarkerManager) MarkFailed(ctx context.Context) { + v.mutex.Lock() + defer v.mutex.Unlock() v.visitMarker.UpdateStatus(v.ownerIdentifier, Failed) if err := v.updateVisitMarker(ctx); err != nil { level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) @@ -151,6 +162,8 @@ func (v *VisitMarkerManager) DeleteVisitMarker(ctx context.Context) { } func (v *VisitMarkerManager) ReloadVisitMarker(ctx context.Context) error { + v.mutex.Lock() + defer v.mutex.Unlock() if err := v.ReadVisitMarker(ctx, v.visitMarker); err != nil { return err } From 1dfd81cea9c7ec2499b6ecf0278523555d4f934a Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 30 Jul 2024 18:03:17 -0700 Subject: [PATCH 12/17] Removed problemic function only used by test Signed-off-by: Alex Le --- pkg/compactor/visit_marker.go | 18 ------------------ pkg/compactor/visit_marker_test.go | 26 ++++++++++++++++++-------- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/pkg/compactor/visit_marker.go b/pkg/compactor/visit_marker.go index b271266f6f..3a1c8e95e3 100644 --- a/pkg/compactor/visit_marker.go +++ b/pkg/compactor/visit_marker.go @@ -110,8 +110,6 @@ heartBeat: } func (v *VisitMarkerManager) MarkInProgress(ctx context.Context) { - v.mutex.Lock() - defer v.mutex.Unlock() v.visitMarker.UpdateStatus(v.ownerIdentifier, InProgress) if err := v.updateVisitMarker(ctx); err != nil { level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) @@ -121,8 +119,6 @@ func (v *VisitMarkerManager) MarkInProgress(ctx context.Context) { } func (v *VisitMarkerManager) MarkPending(ctx context.Context) { - v.mutex.Lock() - defer v.mutex.Unlock() v.visitMarker.UpdateStatus(v.ownerIdentifier, Pending) if err := v.updateVisitMarker(ctx); err != nil { level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) @@ -132,8 +128,6 @@ func (v *VisitMarkerManager) MarkPending(ctx context.Context) { } func (v *VisitMarkerManager) MarkCompleted(ctx context.Context) { - v.mutex.Lock() - defer v.mutex.Unlock() v.visitMarker.UpdateStatus(v.ownerIdentifier, Completed) if err := v.updateVisitMarker(ctx); err != nil { level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) @@ -143,8 +137,6 @@ func (v *VisitMarkerManager) MarkCompleted(ctx context.Context) { } func (v *VisitMarkerManager) MarkFailed(ctx context.Context) { - v.mutex.Lock() - defer v.mutex.Unlock() v.visitMarker.UpdateStatus(v.ownerIdentifier, Failed) if err := v.updateVisitMarker(ctx); err != nil { level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) @@ -161,16 +153,6 @@ func (v *VisitMarkerManager) DeleteVisitMarker(ctx context.Context) { level.Debug(v.getLogger()).Log("msg", "visit marker deleted") } -func (v *VisitMarkerManager) ReloadVisitMarker(ctx context.Context) error { - v.mutex.Lock() - defer v.mutex.Unlock() - if err := v.ReadVisitMarker(ctx, v.visitMarker); err != nil { - return err - } - level.Debug(v.getLogger()).Log("msg", "visit marker reloaded") - return nil -} - func (v *VisitMarkerManager) ReadVisitMarker(ctx context.Context, visitMarker any) error { visitMarkerFile := v.visitMarker.GetVisitMarkerFilePath() visitMarkerFileReader, err := v.bkt.ReaderWithExpectedErrs(v.bkt.IsObjNotFoundErr).Get(ctx, visitMarkerFile) diff --git a/pkg/compactor/visit_marker_test.go b/pkg/compactor/visit_marker_test.go index 2b12f7926d..bffec4434a 100644 --- a/pkg/compactor/visit_marker_test.go +++ b/pkg/compactor/visit_marker_test.go @@ -88,8 +88,6 @@ func TestReloadVisitMarker(t *testing.T) { ownerIdentifier := "test-owner" testVisitMarker := NewTestVisitMarker(ownerIdentifier) - visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) - newValue := "updated stored value" updatedVisitMarker := TestVisitMarker{ OwnerIdentifier: ownerIdentifier, @@ -103,11 +101,13 @@ func TestReloadVisitMarker(t *testing.T) { err = bkt.Upload(ctx, testVisitMarker.GetVisitMarkerFilePath(), reader) require.NoError(t, err) - err = visitMarkerManager.ReloadVisitMarker(ctx) + resultTestVisitMarker := NewTestVisitMarker(ownerIdentifier) + resultVisitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, resultTestVisitMarker, dummyCounter, dummyCounter) + err = resultVisitMarkerManager.ReadVisitMarker(context.Background(), resultTestVisitMarker) require.NoError(t, err) - require.Equal(t, ownerIdentifier, testVisitMarker.OwnerIdentifier) - require.Equal(t, Completed, testVisitMarker.Status) - require.Equal(t, newValue, testVisitMarker.StoredValue) + require.Equal(t, ownerIdentifier, resultTestVisitMarker.OwnerIdentifier) + require.Equal(t, Completed, resultTestVisitMarker.Status) + require.Equal(t, newValue, resultTestVisitMarker.StoredValue) } func TestUpdateExistingVisitMarker(t *testing.T) { @@ -221,9 +221,11 @@ func TestHeartBeat(t *testing.T) { require.NoError(t, err) require.False(t, exists) } else { - err := visitMarkerManager.ReloadVisitMarker(context.Background()) + resultTestVisitMarker := CopyTestVisitMarker(testVisitMarker) + resultVisitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, resultTestVisitMarker, dummyCounter, dummyCounter) + err := resultVisitMarkerManager.ReadVisitMarker(context.Background(), resultTestVisitMarker) require.NoError(t, err) - require.Equal(t, tcase.expectedStatus, testVisitMarker.Status) + require.Equal(t, tcase.expectedStatus, resultTestVisitMarker.Status) } }) } @@ -253,6 +255,14 @@ func NewTestVisitMarker(ownerIdentifier string) *TestVisitMarker { } } +func CopyTestVisitMarker(sourceVisitMarker *TestVisitMarker) *TestVisitMarker { + return &TestVisitMarker{ + OwnerIdentifier: sourceVisitMarker.OwnerIdentifier, + markerID: sourceVisitMarker.markerID, + StoredValue: sourceVisitMarker.StoredValue, + } +} + func (t *TestVisitMarker) GetVisitMarkerFilePath() string { return fmt.Sprintf("test-visit-marker-%s.json", t.markerID.String()) } From 1e173966f34f7063779d592a781b906174f3ded5 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 30 Jul 2024 18:05:07 -0700 Subject: [PATCH 13/17] update test order Signed-off-by: Alex Le --- pkg/compactor/visit_marker_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/compactor/visit_marker_test.go b/pkg/compactor/visit_marker_test.go index bffec4434a..2ed1cd39fd 100644 --- a/pkg/compactor/visit_marker_test.go +++ b/pkg/compactor/visit_marker_test.go @@ -204,6 +204,7 @@ func TestHeartBeat(t *testing.T) { ownerIdentifier := "test-owner" testVisitMarker := NewTestVisitMarker(ownerIdentifier) + resultTestVisitMarker := CopyTestVisitMarker(testVisitMarker) visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) go visitMarkerManager.HeartBeat(ctx, errChan, time.Second, tcase.deleteOnExit) @@ -221,7 +222,6 @@ func TestHeartBeat(t *testing.T) { require.NoError(t, err) require.False(t, exists) } else { - resultTestVisitMarker := CopyTestVisitMarker(testVisitMarker) resultVisitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, resultTestVisitMarker, dummyCounter, dummyCounter) err := resultVisitMarkerManager.ReadVisitMarker(context.Background(), resultTestVisitMarker) require.NoError(t, err) From bce6824a3c02974baeaa58592bab4a80d3996696 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 30 Jul 2024 18:07:06 -0700 Subject: [PATCH 14/17] fix test Signed-off-by: Alex Le --- pkg/compactor/visit_marker_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/compactor/visit_marker_test.go b/pkg/compactor/visit_marker_test.go index 2ed1cd39fd..005ba7a008 100644 --- a/pkg/compactor/visit_marker_test.go +++ b/pkg/compactor/visit_marker_test.go @@ -87,6 +87,7 @@ func TestReloadVisitMarker(t *testing.T) { ownerIdentifier := "test-owner" testVisitMarker := NewTestVisitMarker(ownerIdentifier) + resultTestVisitMarker := CopyTestVisitMarker(testVisitMarker) newValue := "updated stored value" updatedVisitMarker := TestVisitMarker{ @@ -101,7 +102,6 @@ func TestReloadVisitMarker(t *testing.T) { err = bkt.Upload(ctx, testVisitMarker.GetVisitMarkerFilePath(), reader) require.NoError(t, err) - resultTestVisitMarker := NewTestVisitMarker(ownerIdentifier) resultVisitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, resultTestVisitMarker, dummyCounter, dummyCounter) err = resultVisitMarkerManager.ReadVisitMarker(context.Background(), resultTestVisitMarker) require.NoError(t, err) From e1e68817c2c2d778f920e3d0374cecb97ccae30c Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 30 Jul 2024 18:13:58 -0700 Subject: [PATCH 15/17] fix lint Signed-off-by: Alex Le --- pkg/compactor/visit_marker.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/compactor/visit_marker.go b/pkg/compactor/visit_marker.go index 3a1c8e95e3..14637e5d91 100644 --- a/pkg/compactor/visit_marker.go +++ b/pkg/compactor/visit_marker.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "sync" "time" "github.com/go-kit/log" @@ -48,8 +47,6 @@ type VisitMarkerManager struct { visitMarker VisitMarker visitMarkerReadFailed prometheus.Counter visitMarkerWriteFailed prometheus.Counter - - mutex sync.Mutex } func NewVisitMarkerManager( From 37f24589b0c856484daf23a35b9f8adb6af2c0ef Mon Sep 17 00:00:00 2001 From: Alex Le Date: Wed, 31 Jul 2024 09:41:57 -0700 Subject: [PATCH 16/17] refactor Signed-off-by: Alex Le --- pkg/compactor/blocks_cleaner.go | 4 --- pkg/compactor/visit_marker.go | 45 ++++++------------------------ pkg/compactor/visit_marker_test.go | 43 ++++------------------------ 3 files changed, 14 insertions(+), 78 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 3ee5599438..633ce7e4e2 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -175,10 +175,6 @@ type cleanerJob struct { timestamp int64 } -func (c *BlocksCleaner) SetRingLifecyclerID(ringLifecyclerID string) { - c.ringLifecyclerID = ringLifecyclerID -} - func (c *BlocksCleaner) starting(ctx context.Context) error { // Run a cleanup so that any other service depending on this service // is guaranteed to start once the initial cleanup has been done. diff --git a/pkg/compactor/visit_marker.go b/pkg/compactor/visit_marker.go index 14637e5d91..a6329179f7 100644 --- a/pkg/compactor/visit_marker.go +++ b/pkg/compactor/visit_marker.go @@ -73,27 +73,27 @@ func (v *VisitMarkerManager) HeartBeat(ctx context.Context, errChan <-chan error defer ticker.Stop() heartBeat: for { - v.MarkInProgress(ctx) + v.MarkWithStatus(ctx, InProgress) select { case <-ctx.Done(): level.Warn(v.getLogger()).Log("msg", "visit marker heart beat got cancelled") - v.MarkPending(context.Background()) + v.MarkWithStatus(context.Background(), Pending) break heartBeat case <-ticker.C: continue case err := <-errChan: if err == nil { level.Info(v.getLogger()).Log("msg", "update visit marker to completed status") - v.MarkCompleted(ctx) + v.MarkWithStatus(ctx, Completed) } else { level.Warn(v.getLogger()).Log("msg", "stop visit marker heart beat due to error", "err", err) if compact.IsHaltError(err) { level.Info(v.getLogger()).Log("msg", "update visit marker to failed status", "err", err) - v.MarkFailed(ctx) + v.MarkWithStatus(ctx, Failed) } else { level.Info(v.getLogger()).Log("msg", "update visit marker to pending status", "err", err) - v.MarkPending(ctx) + v.MarkWithStatus(ctx, Pending) } } break heartBeat @@ -106,40 +106,13 @@ heartBeat: } } -func (v *VisitMarkerManager) MarkInProgress(ctx context.Context) { - v.visitMarker.UpdateStatus(v.ownerIdentifier, InProgress) +func (v *VisitMarkerManager) MarkWithStatus(ctx context.Context, status VisitStatus) { + v.visitMarker.UpdateStatus(v.ownerIdentifier, status) if err := v.updateVisitMarker(ctx); err != nil { - level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) + level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "new_status", status, "err", err) return } - level.Debug(v.getLogger()).Log("msg", "marked in progress") -} - -func (v *VisitMarkerManager) MarkPending(ctx context.Context) { - v.visitMarker.UpdateStatus(v.ownerIdentifier, Pending) - if err := v.updateVisitMarker(ctx); err != nil { - level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) - return - } - level.Debug(v.getLogger()).Log("msg", "marked pending") -} - -func (v *VisitMarkerManager) MarkCompleted(ctx context.Context) { - v.visitMarker.UpdateStatus(v.ownerIdentifier, Completed) - if err := v.updateVisitMarker(ctx); err != nil { - level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) - return - } - level.Debug(v.getLogger()).Log("msg", "marked completed") -} - -func (v *VisitMarkerManager) MarkFailed(ctx context.Context) { - v.visitMarker.UpdateStatus(v.ownerIdentifier, Failed) - if err := v.updateVisitMarker(ctx); err != nil { - level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "err", err) - return - } - level.Debug(v.getLogger()).Log("msg", "marked failed") + level.Debug(v.getLogger()).Log("msg", "marked with new status", "new_status", status) } func (v *VisitMarkerManager) DeleteVisitMarker(ctx context.Context) { diff --git a/pkg/compactor/visit_marker_test.go b/pkg/compactor/visit_marker_test.go index 005ba7a008..5e60d32b8f 100644 --- a/pkg/compactor/visit_marker_test.go +++ b/pkg/compactor/visit_marker_test.go @@ -1,10 +1,8 @@ package compactor import ( - "bytes" "context" "crypto/rand" - "encoding/json" "fmt" "testing" "time" @@ -29,7 +27,7 @@ func TestMarkPending(t *testing.T) { testVisitMarker := NewTestVisitMarker(ownerIdentifier) visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) - visitMarkerManager.MarkPending(ctx) + visitMarkerManager.MarkWithStatus(ctx, Pending) require.Equal(t, Pending, testVisitMarker.Status) @@ -49,7 +47,7 @@ func TestMarkInProgress(t *testing.T) { testVisitMarker := NewTestVisitMarker(ownerIdentifier) visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) - visitMarkerManager.MarkInProgress(ctx) + visitMarkerManager.MarkWithStatus(ctx, InProgress) require.Equal(t, InProgress, testVisitMarker.Status) @@ -69,7 +67,7 @@ func TestMarkCompleted(t *testing.T) { testVisitMarker := NewTestVisitMarker(ownerIdentifier) visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) - visitMarkerManager.MarkCompleted(ctx) + visitMarkerManager.MarkWithStatus(ctx, Completed) require.Equal(t, Completed, testVisitMarker.Status) @@ -79,37 +77,6 @@ func TestMarkCompleted(t *testing.T) { require.Equal(t, Completed, visitMarkerFromFile.Status) } -func TestReloadVisitMarker(t *testing.T) { - ctx := context.Background() - dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) - bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) - logger := log.NewNopLogger() - - ownerIdentifier := "test-owner" - testVisitMarker := NewTestVisitMarker(ownerIdentifier) - resultTestVisitMarker := CopyTestVisitMarker(testVisitMarker) - - newValue := "updated stored value" - updatedVisitMarker := TestVisitMarker{ - OwnerIdentifier: ownerIdentifier, - Status: Completed, - StoredValue: newValue, - } - visitMarkerFileContent, err := json.Marshal(updatedVisitMarker) - require.NoError(t, err) - - reader := bytes.NewReader(visitMarkerFileContent) - err = bkt.Upload(ctx, testVisitMarker.GetVisitMarkerFilePath(), reader) - require.NoError(t, err) - - resultVisitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, resultTestVisitMarker, dummyCounter, dummyCounter) - err = resultVisitMarkerManager.ReadVisitMarker(context.Background(), resultTestVisitMarker) - require.NoError(t, err) - require.Equal(t, ownerIdentifier, resultTestVisitMarker.OwnerIdentifier) - require.Equal(t, Completed, resultTestVisitMarker.Status) - require.Equal(t, newValue, resultTestVisitMarker.StoredValue) -} - func TestUpdateExistingVisitMarker(t *testing.T) { ctx := context.Background() dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) @@ -119,7 +86,7 @@ func TestUpdateExistingVisitMarker(t *testing.T) { ownerIdentifier1 := "test-owner-1" testVisitMarker1 := NewTestVisitMarker(ownerIdentifier1) visitMarkerManager1 := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier1, testVisitMarker1, dummyCounter, dummyCounter) - visitMarkerManager1.MarkInProgress(ctx) + visitMarkerManager1.MarkWithStatus(ctx, InProgress) ownerIdentifier2 := "test-owner-2" testVisitMarker2 := &TestVisitMarker{ @@ -128,7 +95,7 @@ func TestUpdateExistingVisitMarker(t *testing.T) { StoredValue: testVisitMarker1.StoredValue, } visitMarkerManager2 := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier2, testVisitMarker2, dummyCounter, dummyCounter) - visitMarkerManager2.MarkCompleted(ctx) + visitMarkerManager2.MarkWithStatus(ctx, Completed) visitMarkerFromFile := &TestVisitMarker{} err := visitMarkerManager2.ReadVisitMarker(ctx, visitMarkerFromFile) From c1b4e8a0f2fcf9c928311a5223ab3cdd77458dc4 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Wed, 31 Jul 2024 10:47:55 -0700 Subject: [PATCH 17/17] update doc and remove visit marker metrics Signed-off-by: Alex Le --- docs/blocks-storage/compactor.md | 3 ++- docs/configuration/config-file-reference.md | 3 ++- pkg/compactor/blocks_cleaner.go | 12 +-------- pkg/compactor/compactor.go | 2 +- pkg/compactor/visit_marker.go | 28 ++++++--------------- pkg/compactor/visit_marker_test.go | 20 ++++++--------- 6 files changed, 21 insertions(+), 47 deletions(-) diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index e42364c085..04a08cfa64 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -296,7 +296,8 @@ compactor: [block_visit_marker_file_update_interval: | default = 1m] # How long cleaner visit marker file should be considered as expired and able - # to be picked up by cleaner again. + # to be picked up by cleaner again. The value should be smaller than + # -compactor.cleanup-interval # CLI flag: -compactor.cleaner-visit-marker-timeout [cleaner_visit_marker_timeout: | default = 10m] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index a3465db331..b83996a4c2 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2233,7 +2233,8 @@ sharding_ring: [block_visit_marker_file_update_interval: | default = 1m] # How long cleaner visit marker file should be considered as expired and able to -# be picked up by cleaner again. +# be picked up by cleaner again. The value should be smaller than +# -compactor.cleanup-interval # CLI flag: -compactor.cleaner-visit-marker-timeout [cleaner_visit_marker_timeout: | default = 10m] diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 633ce7e4e2..3ea46a5f38 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -66,8 +66,6 @@ type BlocksCleaner struct { blocksCleanedTotal prometheus.Counter blocksFailedTotal prometheus.Counter blocksMarkedForDeletion *prometheus.CounterVec - cleanerVisitMarkerReadFailed prometheus.Counter - cleanerVisitMarkerWriteFailed prometheus.Counter tenantBlocks *prometheus.GaugeVec tenantBlocksMarkedForDelete *prometheus.GaugeVec tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec @@ -123,14 +121,6 @@ func NewBlocksCleaner( Help: "Total number of blocks failed to be deleted.", }), blocksMarkedForDeletion: blocksMarkedForDeletion, - cleanerVisitMarkerReadFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_compactor_cleaner_visit_marker_read_failed_total", - Help: "Total number of cleaner visit marker file failed to be read.", - }), - cleanerVisitMarkerWriteFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_compactor_cleaner_visit_marker_write_failed_total", - Help: "Total number of cleaner visit marker file failed to be written.", - }), // The following metrics don't have the "cortex_compactor" prefix because not strictly related to // the compactor. They're just tracked by the compactor because it's the most logical place where these @@ -346,7 +336,7 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (visitMarkerManager *VisitMarkerManager, isVisited bool, err error) { cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID) - visitMarkerManager = NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker, c.cleanerVisitMarkerReadFailed, c.cleanerVisitMarkerWriteFailed) + visitMarkerManager = NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker) existingCleanerVisitMarker := &CleanerVisitMarker{} err = visitMarkerManager.ReadVisitMarker(ctx, existingCleanerVisitMarker) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 6ca567a4e8..ff7907fe2c 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -259,7 +259,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.BlockVisitMarkerTimeout, "compactor.block-visit-marker-timeout", 5*time.Minute, "How long block visit marker file should be considered as expired and able to be picked up by compactor again.") f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.") - f.DurationVar(&cfg.CleanerVisitMarkerTimeout, "compactor.cleaner-visit-marker-timeout", 10*time.Minute, "How long cleaner visit marker file should be considered as expired and able to be picked up by cleaner again.") + f.DurationVar(&cfg.CleanerVisitMarkerTimeout, "compactor.cleaner-visit-marker-timeout", 10*time.Minute, "How long cleaner visit marker file should be considered as expired and able to be picked up by cleaner again. The value should be smaller than -compactor.cleanup-interval") f.DurationVar(&cfg.CleanerVisitMarkerFileUpdateInterval, "compactor.cleaner-visit-marker-file-update-interval", 5*time.Minute, "How frequently cleaner visit marker file should be updated when cleaning user.") f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.") diff --git a/pkg/compactor/visit_marker.go b/pkg/compactor/visit_marker.go index a6329179f7..ebe675556d 100644 --- a/pkg/compactor/visit_marker.go +++ b/pkg/compactor/visit_marker.go @@ -11,7 +11,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/compact" @@ -41,12 +40,10 @@ type VisitMarker interface { } type VisitMarkerManager struct { - bkt objstore.InstrumentedBucket - logger log.Logger - ownerIdentifier string - visitMarker VisitMarker - visitMarkerReadFailed prometheus.Counter - visitMarkerWriteFailed prometheus.Counter + bkt objstore.InstrumentedBucket + logger log.Logger + ownerIdentifier string + visitMarker VisitMarker } func NewVisitMarkerManager( @@ -54,16 +51,12 @@ func NewVisitMarkerManager( logger log.Logger, ownerIdentifier string, visitMarker VisitMarker, - visitMarkerReadFailed prometheus.Counter, - visitMarkerWriteFailed prometheus.Counter, ) *VisitMarkerManager { return &VisitMarkerManager{ - bkt: bkt, - logger: log.With(logger, "type", fmt.Sprintf("%T", visitMarker)), - ownerIdentifier: ownerIdentifier, - visitMarker: visitMarker, - visitMarkerReadFailed: visitMarkerReadFailed, - visitMarkerWriteFailed: visitMarkerWriteFailed, + bkt: bkt, + logger: log.With(logger, "type", fmt.Sprintf("%T", visitMarker)), + ownerIdentifier: ownerIdentifier, + visitMarker: visitMarker, } } @@ -130,17 +123,14 @@ func (v *VisitMarkerManager) ReadVisitMarker(ctx context.Context, visitMarker an if v.bkt.IsObjNotFoundErr(err) { return errors.Wrapf(errorVisitMarkerNotFound, "visit marker file: %s", visitMarkerFile) } - v.visitMarkerReadFailed.Inc() return errors.Wrapf(err, "get visit marker file: %s", visitMarkerFile) } defer runutil.CloseWithLogOnErr(v.getLogger(), visitMarkerFileReader, "close visit marker reader") b, err := io.ReadAll(visitMarkerFileReader) if err != nil { - v.visitMarkerReadFailed.Inc() return errors.Wrapf(err, "read visit marker file: %s", visitMarkerFile) } if err = json.Unmarshal(b, visitMarker); err != nil { - v.visitMarkerReadFailed.Inc() return errors.Wrapf(errorUnmarshalVisitMarker, "visit marker file: %s, content: %s, error: %v", visitMarkerFile, string(b), err.Error()) } level.Debug(v.getLogger()).Log("msg", "visit marker read from file", "visit_marker_file", visitMarkerFile) @@ -150,13 +140,11 @@ func (v *VisitMarkerManager) ReadVisitMarker(ctx context.Context, visitMarker an func (v *VisitMarkerManager) updateVisitMarker(ctx context.Context) error { visitMarkerFileContent, err := json.Marshal(v.visitMarker) if err != nil { - v.visitMarkerWriteFailed.Inc() return err } reader := bytes.NewReader(visitMarkerFileContent) if err := v.bkt.Upload(ctx, v.visitMarker.GetVisitMarkerFilePath(), reader); err != nil { - v.visitMarkerWriteFailed.Inc() return err } return nil diff --git a/pkg/compactor/visit_marker_test.go b/pkg/compactor/visit_marker_test.go index 5e60d32b8f..2e3eae5b60 100644 --- a/pkg/compactor/visit_marker_test.go +++ b/pkg/compactor/visit_marker_test.go @@ -9,7 +9,6 @@ import ( "github.com/go-kit/log" "github.com/oklog/ulid" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/compact" @@ -19,14 +18,13 @@ import ( func TestMarkPending(t *testing.T) { ctx := context.Background() - dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) logger := log.NewNopLogger() ownerIdentifier := "test-owner" testVisitMarker := NewTestVisitMarker(ownerIdentifier) - visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) + visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker) visitMarkerManager.MarkWithStatus(ctx, Pending) require.Equal(t, Pending, testVisitMarker.Status) @@ -39,14 +37,13 @@ func TestMarkPending(t *testing.T) { func TestMarkInProgress(t *testing.T) { ctx := context.Background() - dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) logger := log.NewNopLogger() ownerIdentifier := "test-owner" testVisitMarker := NewTestVisitMarker(ownerIdentifier) - visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) + visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker) visitMarkerManager.MarkWithStatus(ctx, InProgress) require.Equal(t, InProgress, testVisitMarker.Status) @@ -59,14 +56,13 @@ func TestMarkInProgress(t *testing.T) { func TestMarkCompleted(t *testing.T) { ctx := context.Background() - dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) logger := log.NewNopLogger() ownerIdentifier := "test-owner" testVisitMarker := NewTestVisitMarker(ownerIdentifier) - visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) + visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker) visitMarkerManager.MarkWithStatus(ctx, Completed) require.Equal(t, Completed, testVisitMarker.Status) @@ -79,13 +75,12 @@ func TestMarkCompleted(t *testing.T) { func TestUpdateExistingVisitMarker(t *testing.T) { ctx := context.Background() - dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) logger := log.NewNopLogger() ownerIdentifier1 := "test-owner-1" testVisitMarker1 := NewTestVisitMarker(ownerIdentifier1) - visitMarkerManager1 := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier1, testVisitMarker1, dummyCounter, dummyCounter) + visitMarkerManager1 := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier1, testVisitMarker1) visitMarkerManager1.MarkWithStatus(ctx, InProgress) ownerIdentifier2 := "test-owner-2" @@ -94,7 +89,7 @@ func TestUpdateExistingVisitMarker(t *testing.T) { markerID: testVisitMarker1.markerID, StoredValue: testVisitMarker1.StoredValue, } - visitMarkerManager2 := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier2, testVisitMarker2, dummyCounter, dummyCounter) + visitMarkerManager2 := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier2, testVisitMarker2) visitMarkerManager2.MarkWithStatus(ctx, Completed) visitMarkerFromFile := &TestVisitMarker{} @@ -164,7 +159,6 @@ func TestHeartBeat(t *testing.T) { } { t.Run(tcase.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) logger := log.NewNopLogger() errChan := make(chan error, 1) @@ -172,7 +166,7 @@ func TestHeartBeat(t *testing.T) { ownerIdentifier := "test-owner" testVisitMarker := NewTestVisitMarker(ownerIdentifier) resultTestVisitMarker := CopyTestVisitMarker(testVisitMarker) - visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker, dummyCounter, dummyCounter) + visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker) go visitMarkerManager.HeartBeat(ctx, errChan, time.Second, tcase.deleteOnExit) time.Sleep(2 * time.Second) @@ -189,7 +183,7 @@ func TestHeartBeat(t *testing.T) { require.NoError(t, err) require.False(t, exists) } else { - resultVisitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, resultTestVisitMarker, dummyCounter, dummyCounter) + resultVisitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, resultTestVisitMarker) err := resultVisitMarkerManager.ReadVisitMarker(context.Background(), resultTestVisitMarker) require.NoError(t, err) require.Equal(t, tcase.expectedStatus, resultTestVisitMarker.Status)