diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 92a1206c589b..e7d920277484 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -229,23 +229,27 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage return err } - deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion") - - c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient) - if err != nil { - return err - } - if c.deleteMode == deletion.WholeStreamDeletion { - c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r) - c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r) - - c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager) + deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion") - c.tableMarker, err = retention.NewMarker(retentionWorkDir, schemaConfig, c.expirationChecker, chunkClient, r) + c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient) if err != nil { return err } + c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r) + c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r) + c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager) + } else { + c.expirationChecker = newExpirationChecker( + retention.NewExpirationChecker(limits), + // This is a dummy deletion ExpirationChecker that never expires anything + retention.NeverExpiringExpirationChecker(limits), + ) + } + + c.tableMarker, err = retention.NewMarker(retentionWorkDir, schemaConfig, c.expirationChecker, chunkClient, r) + if err != nil { + return err } } @@ -302,7 +306,9 @@ func (c *Compactor) starting(ctx context.Context) (err error) { func (c *Compactor) loop(ctx context.Context) error { if c.cfg.RetentionEnabled { - defer c.deleteRequestsStore.Stop() + if c.deleteRequestsStore != nil { + defer c.deleteRequestsStore.Stop() + } if c.deleteRequestsManager != nil { defer c.deleteRequestsManager.Stop() } diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration.go b/pkg/storage/stores/shipper/compactor/retention/expiration.go index 4d9edaac19c8..e9aee2744027 100644 --- a/pkg/storage/stores/shipper/compactor/retention/expiration.go +++ b/pkg/storage/stores/shipper/compactor/retention/expiration.go @@ -81,6 +81,26 @@ func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval return interval.Start.Before(latestRetentionStartTime) } +// NeverExpiringExpirationChecker returns an expiration checker that never expires anything +func NeverExpiringExpirationChecker(limits Limits) ExpirationChecker { + return &neverExpiringExpirationChecker{} +} + +type neverExpiringExpirationChecker struct{} + +func (e *neverExpiringExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) { + return false, nil +} +func (e *neverExpiringExpirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool { + return false +} +func (e *neverExpiringExpirationChecker) MarkPhaseStarted() {} +func (e *neverExpiringExpirationChecker) MarkPhaseFailed() {} +func (e *neverExpiringExpirationChecker) MarkPhaseFinished() {} +func (e *neverExpiringExpirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool { + return false +} + type TenantsRetention struct { limits Limits }