Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use default retention period to check user index may have expired chunks when user does not have custom retention #5261

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 40 additions & 16 deletions pkg/storage/stores/shipper/compactor/retention/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ type ExpirationChecker interface {
}

type expirationChecker struct {
tenantsRetention *TenantsRetention
latestRetentionStartTime model.Time
latestRetentionStartTimeByUser map[string]model.Time
tenantsRetention *TenantsRetention
latestRetentionStartTime latestRetentionStartTime
}

type Limits interface {
Expand Down Expand Up @@ -57,19 +56,26 @@ func (e *expirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Tim
}

func (e *expirationChecker) MarkPhaseStarted() {
e.latestRetentionStartTime, e.latestRetentionStartTimeByUser = findLatestRetentionStartTime(model.Now(), e.tenantsRetention.limits)
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("smallest retention period %v", e.latestRetentionStartTime))
e.latestRetentionStartTime = findLatestRetentionStartTime(model.Now(), e.tenantsRetention.limits)
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("overall smallest retention period %v, default smallest retention period %v",
e.latestRetentionStartTime.overall, e.latestRetentionStartTime.defaults))
}

func (e *expirationChecker) MarkPhaseFailed() {}
func (e *expirationChecker) MarkPhaseFinished() {}

func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool {
latestRetentionStartTime := e.latestRetentionStartTime
// when userID is empty, it means we are checking for common index table. In this case we use e.overallLatestRetentionStartTime.
latestRetentionStartTime := e.latestRetentionStartTime.overall
if userID != "" {
latestRetentionStartTimeForUser, ok := e.latestRetentionStartTimeByUser[userID]
// when userID is not empty, it means we are checking for user index table.
latestRetentionStartTimeForUser, ok := e.latestRetentionStartTime.byUser[userID]
if ok {
// user has custom retention config, let us use user specific latest retention start time.
latestRetentionStartTime = latestRetentionStartTimeForUser
} else {
// user does not have custom retention config, let us use default latest retention start time.
latestRetentionStartTime = e.latestRetentionStartTime.defaults
}
}
return interval.Start.Before(latestRetentionStartTime)
Expand Down Expand Up @@ -119,17 +125,31 @@ Outer:
return globalRetention
}

// findLatestRetentionStartTime returns the latest retention start time overall and by each user.
func findLatestRetentionStartTime(now model.Time, limits Limits) (model.Time, map[string]model.Time) {
type latestRetentionStartTime struct {
// defaults holds latest retention start time considering only default retention config.
// It is used to determine if user index table may have any expired chunks when the user does not have any custom retention config set.
defaults model.Time
// overall holds latest retention start time for all users considering both default and per user retention config.
// It is used to determine if common index table may have any expired chunks.
overall model.Time
// byUser holds latest retention start time considering only per user retention config.
// It is used to determine if user index table may have any expired chunks.
byUser map[string]model.Time
}

// findLatestRetentionStartTime returns the latest retention start time overall, just default config and by each user.
func findLatestRetentionStartTime(now model.Time, limits Limits) latestRetentionStartTime {
// find the smallest retention period from default limits
defaultLimits := limits.DefaultLimits()
smallestRetentionPeriod := defaultLimits.RetentionPeriod
smallestDefaultRetentionPeriod := defaultLimits.RetentionPeriod
for _, streamRetention := range defaultLimits.StreamRetention {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
if streamRetention.Period < smallestRetentionPeriod {
smallestRetentionPeriod = streamRetention.Period
if streamRetention.Period < smallestDefaultRetentionPeriod {
smallestDefaultRetentionPeriod = streamRetention.Period
}
}

overallSmallestRetentionPeriod := smallestDefaultRetentionPeriod

// find the smallest retention period by user
limitsByUserID := limits.AllByUserID()
smallestRetentionPeriodByUser := make(map[string]model.Time, len(limitsByUserID))
Expand All @@ -141,12 +161,16 @@ func findLatestRetentionStartTime(now model.Time, limits Limits) (model.Time, ma
}
}

// update the common smallestRetentionPeriod if this user has smaller value
// update the overallSmallestRetentionPeriod if this user has smaller value
smallestRetentionPeriodByUser[userID] = now.Add(time.Duration(-smallestRetentionPeriodForUser))
if smallestRetentionPeriodForUser < smallestRetentionPeriod {
smallestRetentionPeriod = smallestRetentionPeriodForUser
if smallestRetentionPeriodForUser < overallSmallestRetentionPeriod {
overallSmallestRetentionPeriod = smallestRetentionPeriodForUser
}
}

return now.Add(time.Duration(-smallestRetentionPeriod)), smallestRetentionPeriodByUser
return latestRetentionStartTime{
defaults: now.Add(time.Duration(-smallestDefaultRetentionPeriod)),
overall: now.Add(time.Duration(-overallSmallestRetentionPeriod)),
byUser: smallestRetentionPeriodByUser,
}
}
161 changes: 117 additions & 44 deletions pkg/storage/stores/shipper/compactor/retention/expiration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,9 @@ func TestFindLatestRetentionStartTime(t *testing.T) {
const dayDuration = 24 * time.Hour
now := model.Now()
for _, tc := range []struct {
name string
limit fakeLimits
expectedLatestRetentionStartTime model.Time
expectedLatestRetentionStartTimeByUser map[string]model.Time
name string
limit fakeLimits
expectedLatestRetentionStartTime latestRetentionStartTime
}{
{
name: "only default retention set",
Expand All @@ -104,8 +103,11 @@ func TestFindLatestRetentionStartTime(t *testing.T) {
retentionPeriod: 7 * dayDuration,
},
},
expectedLatestRetentionStartTime: now.Add(-7 * dayDuration),
expectedLatestRetentionStartTimeByUser: map[string]model.Time{},
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-7 * dayDuration),
defaults: now.Add(-7 * dayDuration),
byUser: map[string]model.Time{},
},
},
{
name: "default retention period smallest",
Expand All @@ -123,10 +125,13 @@ func TestFindLatestRetentionStartTime(t *testing.T) {
"1": {retentionPeriod: 15 * dayDuration},
},
},
expectedLatestRetentionStartTime: now.Add(-7 * dayDuration),
expectedLatestRetentionStartTimeByUser: map[string]model.Time{
"0": now.Add(-12 * dayDuration),
"1": now.Add(-15 * dayDuration),
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-7 * dayDuration),
defaults: now.Add(-7 * dayDuration),
byUser: map[string]model.Time{
"0": now.Add(-12 * dayDuration),
"1": now.Add(-15 * dayDuration),
},
},
},
{
Expand All @@ -145,10 +150,13 @@ func TestFindLatestRetentionStartTime(t *testing.T) {
"1": {retentionPeriod: 5 * dayDuration},
},
},
expectedLatestRetentionStartTime: now.Add(-3 * dayDuration),
expectedLatestRetentionStartTimeByUser: map[string]model.Time{
"0": now.Add(-7 * dayDuration),
"1": now.Add(-5 * dayDuration),
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-3 * dayDuration),
defaults: now.Add(-3 * dayDuration),
byUser: map[string]model.Time{
"0": now.Add(-7 * dayDuration),
"1": now.Add(-5 * dayDuration),
},
},
},
{
Expand Down Expand Up @@ -181,10 +189,13 @@ func TestFindLatestRetentionStartTime(t *testing.T) {
},
},
},
expectedLatestRetentionStartTime: now.Add(-5 * dayDuration),
expectedLatestRetentionStartTimeByUser: map[string]model.Time{
"0": now.Add(-10 * dayDuration),
"1": now.Add(-5 * dayDuration),
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-5 * dayDuration),
defaults: now.Add(-7 * dayDuration),
byUser: map[string]model.Time{
"0": now.Add(-10 * dayDuration),
"1": now.Add(-5 * dayDuration),
},
},
},
{
Expand Down Expand Up @@ -217,63 +228,125 @@ func TestFindLatestRetentionStartTime(t *testing.T) {
},
},
},
expectedLatestRetentionStartTime: now.Add(-2 * dayDuration),
expectedLatestRetentionStartTimeByUser: map[string]model.Time{
"0": now.Add(-10 * dayDuration),
"1": now.Add(-2 * dayDuration),
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-2 * dayDuration),
defaults: now.Add(-7 * dayDuration),
byUser: map[string]model.Time{
"0": now.Add(-10 * dayDuration),
"1": now.Add(-2 * dayDuration),
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
latestRetentionStartTime, latestRetentionStartTimeByUser := findLatestRetentionStartTime(now, tc.limit)
latestRetentionStartTime := findLatestRetentionStartTime(now, tc.limit)
require.Equal(t, tc.expectedLatestRetentionStartTime, latestRetentionStartTime)
require.Equal(t, tc.expectedLatestRetentionStartTimeByUser, latestRetentionStartTimeByUser)
})
}
}

func TestExpirationChecker_IntervalMayHaveExpiredChunks(t *testing.T) {
now := model.Now()
expirationChecker := expirationChecker{
latestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-24 * time.Hour),
defaults: now.Add(-48 * time.Hour),
byUser: map[string]model.Time{
"user0": now.Add(-72 * time.Hour),
"user1": now.Add(-24 * time.Hour),
},
},
}

for _, tc := range []struct {
name string
expirationChecker expirationChecker
interval model.Interval
hasExpiredChunks bool
name string
userID string
interval model.Interval
hasExpiredChunks bool
}{
// common index using overallLatestRetentionStartTime
{
name: "not expired",
expirationChecker: expirationChecker{
latestRetentionStartTime: model.Now().Add(-24 * time.Hour),
name: "common index - not expired",
interval: model.Interval{
Start: now.Add(-23 * time.Hour),
End: now,
},
},
{
name: "common index - partially expired",
interval: model.Interval{
Start: model.Now().Add(-time.Hour),
End: model.Now(),
Start: now.Add(-25 * time.Hour),
End: now.Add(-22 * time.Hour),
},
hasExpiredChunks: true,
},
{
name: "partially expired",
expirationChecker: expirationChecker{
latestRetentionStartTime: model.Now().Add(-24 * time.Hour),
name: "common index - fully expired",
interval: model.Interval{
Start: now.Add(-26 * time.Hour),
End: now.Add(-25 * time.Hour),
},
hasExpiredChunks: true,
},

// user0 having custom retention
{
name: "user0 index - not expired",
userID: "user0",
interval: model.Interval{
Start: model.Now().Add(-25 * time.Hour),
End: model.Now().Add(-22 * time.Hour),
Start: now.Add(-71 + time.Hour),
End: now,
},
},
{
name: "user0 index - partially expired",
userID: "user0",
interval: model.Interval{
Start: now.Add(-73 * time.Hour),
End: now.Add(-71 * time.Hour),
},
hasExpiredChunks: true,
},
{
name: "user0 index - fully expired",
userID: "user0",
interval: model.Interval{
Start: now.Add(-74 * time.Hour),
End: now.Add(-73 * time.Hour),
},
hasExpiredChunks: true,
},

// user3 not having custom retention so using defaultLatestRetentionStartTime
{
name: "fully expired",
expirationChecker: expirationChecker{
latestRetentionStartTime: model.Now().Add(-24 * time.Hour),
name: "user3 index - not expired",
userID: "user3",
interval: model.Interval{
Start: now.Add(-47 * time.Hour),
End: now,
},
},
{
name: "user3 index - partially expired",
userID: "user3",
interval: model.Interval{
Start: now.Add(-49 * time.Hour),
End: now.Add(-47 * time.Hour),
},
hasExpiredChunks: true,
},
{
name: "user3 index - fully expired",
userID: "user3",
interval: model.Interval{
Start: model.Now().Add(-26 * time.Hour),
End: model.Now().Add(-25 * time.Hour),
Start: now.Add(-50 * time.Hour),
End: now.Add(-49 * time.Hour),
},
hasExpiredChunks: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.hasExpiredChunks, tc.expirationChecker.IntervalMayHaveExpiredChunks(tc.interval, ""))
require.Equal(t, tc.hasExpiredChunks, expirationChecker.IntervalMayHaveExpiredChunks(tc.interval, tc.userID))
})
}
}