diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration.go b/pkg/storage/stores/shipper/compactor/retention/expiration.go index 87c4e6d6ec05..4d9edaac19c8 100644 --- a/pkg/storage/stores/shipper/compactor/retention/expiration.go +++ b/pkg/storage/stores/shipper/compactor/retention/expiration.go @@ -22,9 +22,8 @@ type ExpirationChecker interface { } type expirationChecker struct { - tenantsRetention *TenantsRetention - latestRetentionStartTime model.Time - latestRetentionStartTimeByUser map[string]model.Time + tenantsRetention *TenantsRetention + latestRetentionStartTime latestRetentionStartTime } type Limits interface { @@ -57,19 +56,26 @@ func (e *expirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Tim } func (e *expirationChecker) MarkPhaseStarted() { - e.latestRetentionStartTime, e.latestRetentionStartTimeByUser = findLatestRetentionStartTime(model.Now(), e.tenantsRetention.limits) - level.Info(util_log.Logger).Log("msg", fmt.Sprintf("smallest retention period %v", e.latestRetentionStartTime)) + e.latestRetentionStartTime = findLatestRetentionStartTime(model.Now(), e.tenantsRetention.limits) + level.Info(util_log.Logger).Log("msg", fmt.Sprintf("overall smallest retention period %v, default smallest retention period %v", + e.latestRetentionStartTime.overall, e.latestRetentionStartTime.defaults)) } func (e *expirationChecker) MarkPhaseFailed() {} func (e *expirationChecker) MarkPhaseFinished() {} func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool { - latestRetentionStartTime := e.latestRetentionStartTime + // when userID is empty, it means we are checking for common index table. In this case we use e.overallLatestRetentionStartTime. + latestRetentionStartTime := e.latestRetentionStartTime.overall if userID != "" { - latestRetentionStartTimeForUser, ok := e.latestRetentionStartTimeByUser[userID] + // when userID is not empty, it means we are checking for user index table. + latestRetentionStartTimeForUser, ok := e.latestRetentionStartTime.byUser[userID] if ok { + // user has custom retention config, let us use user specific latest retention start time. latestRetentionStartTime = latestRetentionStartTimeForUser + } else { + // user does not have custom retention config, let us use default latest retention start time. + latestRetentionStartTime = e.latestRetentionStartTime.defaults } } return interval.Start.Before(latestRetentionStartTime) @@ -119,17 +125,31 @@ Outer: return globalRetention } -// findLatestRetentionStartTime returns the latest retention start time overall and by each user. -func findLatestRetentionStartTime(now model.Time, limits Limits) (model.Time, map[string]model.Time) { +type latestRetentionStartTime struct { + // defaults holds latest retention start time considering only default retention config. + // It is used to determine if user index table may have any expired chunks when the user does not have any custom retention config set. + defaults model.Time + // overall holds latest retention start time for all users considering both default and per user retention config. + // It is used to determine if common index table may have any expired chunks. + overall model.Time + // byUser holds latest retention start time considering only per user retention config. + // It is used to determine if user index table may have any expired chunks. + byUser map[string]model.Time +} + +// findLatestRetentionStartTime returns the latest retention start time overall, just default config and by each user. +func findLatestRetentionStartTime(now model.Time, limits Limits) latestRetentionStartTime { // find the smallest retention period from default limits defaultLimits := limits.DefaultLimits() - smallestRetentionPeriod := defaultLimits.RetentionPeriod + smallestDefaultRetentionPeriod := defaultLimits.RetentionPeriod for _, streamRetention := range defaultLimits.StreamRetention { - if streamRetention.Period < smallestRetentionPeriod { - smallestRetentionPeriod = streamRetention.Period + if streamRetention.Period < smallestDefaultRetentionPeriod { + smallestDefaultRetentionPeriod = streamRetention.Period } } + overallSmallestRetentionPeriod := smallestDefaultRetentionPeriod + // find the smallest retention period by user limitsByUserID := limits.AllByUserID() smallestRetentionPeriodByUser := make(map[string]model.Time, len(limitsByUserID)) @@ -141,12 +161,16 @@ func findLatestRetentionStartTime(now model.Time, limits Limits) (model.Time, ma } } - // update the common smallestRetentionPeriod if this user has smaller value + // update the overallSmallestRetentionPeriod if this user has smaller value smallestRetentionPeriodByUser[userID] = now.Add(time.Duration(-smallestRetentionPeriodForUser)) - if smallestRetentionPeriodForUser < smallestRetentionPeriod { - smallestRetentionPeriod = smallestRetentionPeriodForUser + if smallestRetentionPeriodForUser < overallSmallestRetentionPeriod { + overallSmallestRetentionPeriod = smallestRetentionPeriodForUser } } - return now.Add(time.Duration(-smallestRetentionPeriod)), smallestRetentionPeriodByUser + return latestRetentionStartTime{ + defaults: now.Add(time.Duration(-smallestDefaultRetentionPeriod)), + overall: now.Add(time.Duration(-overallSmallestRetentionPeriod)), + byUser: smallestRetentionPeriodByUser, + } } diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration_test.go b/pkg/storage/stores/shipper/compactor/retention/expiration_test.go index 19efe60ab8e4..e51a93cbe0ad 100644 --- a/pkg/storage/stores/shipper/compactor/retention/expiration_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/expiration_test.go @@ -92,10 +92,9 @@ func TestFindLatestRetentionStartTime(t *testing.T) { const dayDuration = 24 * time.Hour now := model.Now() for _, tc := range []struct { - name string - limit fakeLimits - expectedLatestRetentionStartTime model.Time - expectedLatestRetentionStartTimeByUser map[string]model.Time + name string + limit fakeLimits + expectedLatestRetentionStartTime latestRetentionStartTime }{ { name: "only default retention set", @@ -104,8 +103,11 @@ func TestFindLatestRetentionStartTime(t *testing.T) { retentionPeriod: 7 * dayDuration, }, }, - expectedLatestRetentionStartTime: now.Add(-7 * dayDuration), - expectedLatestRetentionStartTimeByUser: map[string]model.Time{}, + expectedLatestRetentionStartTime: latestRetentionStartTime{ + overall: now.Add(-7 * dayDuration), + defaults: now.Add(-7 * dayDuration), + byUser: map[string]model.Time{}, + }, }, { name: "default retention period smallest", @@ -123,10 +125,13 @@ func TestFindLatestRetentionStartTime(t *testing.T) { "1": {retentionPeriod: 15 * dayDuration}, }, }, - expectedLatestRetentionStartTime: now.Add(-7 * dayDuration), - expectedLatestRetentionStartTimeByUser: map[string]model.Time{ - "0": now.Add(-12 * dayDuration), - "1": now.Add(-15 * dayDuration), + expectedLatestRetentionStartTime: latestRetentionStartTime{ + overall: now.Add(-7 * dayDuration), + defaults: now.Add(-7 * dayDuration), + byUser: map[string]model.Time{ + "0": now.Add(-12 * dayDuration), + "1": now.Add(-15 * dayDuration), + }, }, }, { @@ -145,10 +150,13 @@ func TestFindLatestRetentionStartTime(t *testing.T) { "1": {retentionPeriod: 5 * dayDuration}, }, }, - expectedLatestRetentionStartTime: now.Add(-3 * dayDuration), - expectedLatestRetentionStartTimeByUser: map[string]model.Time{ - "0": now.Add(-7 * dayDuration), - "1": now.Add(-5 * dayDuration), + expectedLatestRetentionStartTime: latestRetentionStartTime{ + overall: now.Add(-3 * dayDuration), + defaults: now.Add(-3 * dayDuration), + byUser: map[string]model.Time{ + "0": now.Add(-7 * dayDuration), + "1": now.Add(-5 * dayDuration), + }, }, }, { @@ -181,10 +189,13 @@ func TestFindLatestRetentionStartTime(t *testing.T) { }, }, }, - expectedLatestRetentionStartTime: now.Add(-5 * dayDuration), - expectedLatestRetentionStartTimeByUser: map[string]model.Time{ - "0": now.Add(-10 * dayDuration), - "1": now.Add(-5 * dayDuration), + expectedLatestRetentionStartTime: latestRetentionStartTime{ + overall: now.Add(-5 * dayDuration), + defaults: now.Add(-7 * dayDuration), + byUser: map[string]model.Time{ + "0": now.Add(-10 * dayDuration), + "1": now.Add(-5 * dayDuration), + }, }, }, { @@ -217,63 +228,125 @@ func TestFindLatestRetentionStartTime(t *testing.T) { }, }, }, - expectedLatestRetentionStartTime: now.Add(-2 * dayDuration), - expectedLatestRetentionStartTimeByUser: map[string]model.Time{ - "0": now.Add(-10 * dayDuration), - "1": now.Add(-2 * dayDuration), + expectedLatestRetentionStartTime: latestRetentionStartTime{ + overall: now.Add(-2 * dayDuration), + defaults: now.Add(-7 * dayDuration), + byUser: map[string]model.Time{ + "0": now.Add(-10 * dayDuration), + "1": now.Add(-2 * dayDuration), + }, }, }, } { t.Run(tc.name, func(t *testing.T) { - latestRetentionStartTime, latestRetentionStartTimeByUser := findLatestRetentionStartTime(now, tc.limit) + latestRetentionStartTime := findLatestRetentionStartTime(now, tc.limit) require.Equal(t, tc.expectedLatestRetentionStartTime, latestRetentionStartTime) - require.Equal(t, tc.expectedLatestRetentionStartTimeByUser, latestRetentionStartTimeByUser) }) } } func TestExpirationChecker_IntervalMayHaveExpiredChunks(t *testing.T) { + now := model.Now() + expirationChecker := expirationChecker{ + latestRetentionStartTime: latestRetentionStartTime{ + overall: now.Add(-24 * time.Hour), + defaults: now.Add(-48 * time.Hour), + byUser: map[string]model.Time{ + "user0": now.Add(-72 * time.Hour), + "user1": now.Add(-24 * time.Hour), + }, + }, + } + for _, tc := range []struct { - name string - expirationChecker expirationChecker - interval model.Interval - hasExpiredChunks bool + name string + userID string + interval model.Interval + hasExpiredChunks bool }{ + // common index using overallLatestRetentionStartTime { - name: "not expired", - expirationChecker: expirationChecker{ - latestRetentionStartTime: model.Now().Add(-24 * time.Hour), + name: "common index - not expired", + interval: model.Interval{ + Start: now.Add(-23 * time.Hour), + End: now, }, + }, + { + name: "common index - partially expired", interval: model.Interval{ - Start: model.Now().Add(-time.Hour), - End: model.Now(), + Start: now.Add(-25 * time.Hour), + End: now.Add(-22 * time.Hour), }, + hasExpiredChunks: true, }, { - name: "partially expired", - expirationChecker: expirationChecker{ - latestRetentionStartTime: model.Now().Add(-24 * time.Hour), + name: "common index - fully expired", + interval: model.Interval{ + Start: now.Add(-26 * time.Hour), + End: now.Add(-25 * time.Hour), }, + hasExpiredChunks: true, + }, + + // user0 having custom retention + { + name: "user0 index - not expired", + userID: "user0", interval: model.Interval{ - Start: model.Now().Add(-25 * time.Hour), - End: model.Now().Add(-22 * time.Hour), + Start: now.Add(-71 + time.Hour), + End: now, + }, + }, + { + name: "user0 index - partially expired", + userID: "user0", + interval: model.Interval{ + Start: now.Add(-73 * time.Hour), + End: now.Add(-71 * time.Hour), + }, + hasExpiredChunks: true, + }, + { + name: "user0 index - fully expired", + userID: "user0", + interval: model.Interval{ + Start: now.Add(-74 * time.Hour), + End: now.Add(-73 * time.Hour), }, hasExpiredChunks: true, }, + + // user3 not having custom retention so using defaultLatestRetentionStartTime { - name: "fully expired", - expirationChecker: expirationChecker{ - latestRetentionStartTime: model.Now().Add(-24 * time.Hour), + name: "user3 index - not expired", + userID: "user3", + interval: model.Interval{ + Start: now.Add(-47 * time.Hour), + End: now, }, + }, + { + name: "user3 index - partially expired", + userID: "user3", + interval: model.Interval{ + Start: now.Add(-49 * time.Hour), + End: now.Add(-47 * time.Hour), + }, + hasExpiredChunks: true, + }, + { + name: "user3 index - fully expired", + userID: "user3", interval: model.Interval{ - Start: model.Now().Add(-26 * time.Hour), - End: model.Now().Add(-25 * time.Hour), + Start: now.Add(-50 * time.Hour), + End: now.Add(-49 * time.Hour), }, hasExpiredChunks: true, }, } { t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.hasExpiredChunks, tc.expirationChecker.IntervalMayHaveExpiredChunks(tc.interval, "")) + require.Equal(t, tc.hasExpiredChunks, expirationChecker.IntervalMayHaveExpiredChunks(tc.interval, tc.userID)) }) } }