Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
  • Loading branch information
MichelHollands committed Apr 6, 2022
1 parent f9addc8 commit 85a32ea
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 13 deletions.
32 changes: 19 additions & 13 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,23 +229,27 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage
return err
}

deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")

c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient)
if err != nil {
return err
}

if c.deleteMode == deletion.WholeStreamDeletion {
c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r)
c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r)

c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager)
deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")

c.tableMarker, err = retention.NewMarker(retentionWorkDir, schemaConfig, c.expirationChecker, chunkClient, r)
c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient)
if err != nil {
return err
}
c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r)
c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r)
c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager)
} else {
c.expirationChecker = newExpirationChecker(
retention.NewExpirationChecker(limits),
// This is a dummy deletion ExpirationChecker that never expires anything
retention.NeverExpiringExpirationChecker(limits),
)
}

c.tableMarker, err = retention.NewMarker(retentionWorkDir, schemaConfig, c.expirationChecker, chunkClient, r)
if err != nil {
return err
}
}

Expand Down Expand Up @@ -302,7 +306,9 @@ func (c *Compactor) starting(ctx context.Context) (err error) {

func (c *Compactor) loop(ctx context.Context) error {
if c.cfg.RetentionEnabled {
defer c.deleteRequestsStore.Stop()
if c.deleteRequestsStore != nil {
defer c.deleteRequestsStore.Stop()
}
if c.deleteRequestsManager != nil {
defer c.deleteRequestsManager.Stop()
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/storage/stores/shipper/compactor/retention/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,26 @@ func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval
return interval.Start.Before(latestRetentionStartTime)
}

// NeverExpiringExpirationChecker returns an expiration checker that never expires anything
func NeverExpiringExpirationChecker(limits Limits) ExpirationChecker {
return &neverExpiringExpirationChecker{}
}

type neverExpiringExpirationChecker struct{}

func (e *neverExpiringExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) {
return false, nil
}
func (e *neverExpiringExpirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool {
return false
}
func (e *neverExpiringExpirationChecker) MarkPhaseStarted() {}
func (e *neverExpiringExpirationChecker) MarkPhaseFailed() {}
func (e *neverExpiringExpirationChecker) MarkPhaseFinished() {}
func (e *neverExpiringExpirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool {
return false
}

type TenantsRetention struct {
limits Limits
}
Expand Down

0 comments on commit 85a32ea

Please sign in to comment.