-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
use default retention period to check user index may have expired chunks when user does not have custom retention #5261
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,8 +22,15 @@ type ExpirationChecker interface { | |
} | ||
|
||
type expirationChecker struct { | ||
tenantsRetention *TenantsRetention | ||
latestRetentionStartTime model.Time | ||
tenantsRetention *TenantsRetention | ||
// overallLatestRetentionStartTime holds latest retention start time considering both default and per user retention config. | ||
// It is used to determine if common index table may have any expired chunks. | ||
overallLatestRetentionStartTime model.Time | ||
// defaultLatestRetentionStartTime holds latest retention start time considering only default retention config. | ||
// It is used to determine if user index table may have any expired chunks when the user does not have any custom retention config set. | ||
defaultLatestRetentionStartTime model.Time | ||
// userLatestRetentionStartTime holds latest retention start time considering only per user retention config. | ||
// It is used to determine if user index table may have any expired chunks. | ||
latestRetentionStartTimeByUser map[string]model.Time | ||
} | ||
|
||
|
@@ -57,19 +64,25 @@ func (e *expirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Tim | |
} | ||
|
||
func (e *expirationChecker) MarkPhaseStarted() { | ||
e.latestRetentionStartTime, e.latestRetentionStartTimeByUser = findLatestRetentionStartTime(model.Now(), e.tenantsRetention.limits) | ||
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("smallest retention period %v", e.latestRetentionStartTime)) | ||
e.overallLatestRetentionStartTime, e.defaultLatestRetentionStartTime, e.latestRetentionStartTimeByUser = findLatestRetentionStartTime(model.Now(), e.tenantsRetention.limits) | ||
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("overall smallest retention period %v, default smallest retention period %v", e.overallLatestRetentionStartTime, e.defaultLatestRetentionStartTime)) | ||
} | ||
|
||
func (e *expirationChecker) MarkPhaseFailed() {} | ||
func (e *expirationChecker) MarkPhaseFinished() {} | ||
|
||
func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool { | ||
latestRetentionStartTime := e.latestRetentionStartTime | ||
// when userID is empty, it means we are checking for common index table. In this case we use e.overallLatestRetentionStartTime. | ||
latestRetentionStartTime := e.overallLatestRetentionStartTime | ||
if userID != "" { | ||
// when userID is not empty, it means we are checking for user index table. | ||
latestRetentionStartTimeForUser, ok := e.latestRetentionStartTimeByUser[userID] | ||
if ok { | ||
// user has custom retention config, let us use user specific latest retention start time. | ||
latestRetentionStartTime = latestRetentionStartTimeForUser | ||
} else { | ||
// user does not have custom retention config, let us use default latest retention start time. | ||
latestRetentionStartTime = e.defaultLatestRetentionStartTime | ||
} | ||
} | ||
return interval.Start.Before(latestRetentionStartTime) | ||
|
@@ -119,17 +132,19 @@ Outer: | |
return globalRetention | ||
} | ||
|
||
// findLatestRetentionStartTime returns the latest retention start time overall and by each user. | ||
func findLatestRetentionStartTime(now model.Time, limits Limits) (model.Time, map[string]model.Time) { | ||
// findLatestRetentionStartTime returns the latest retention start time overall, just default config and by each user. | ||
func findLatestRetentionStartTime(now model.Time, limits Limits) (model.Time, model.Time, map[string]model.Time) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I'm not happy with the return here anymore. I think in the future we should make that a struct with proper names, this would avoid confusion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. makes sense, pushed this change |
||
// find the smallest retention period from default limits | ||
defaultLimits := limits.DefaultLimits() | ||
smallestRetentionPeriod := defaultLimits.RetentionPeriod | ||
smallestDefaultRetentionPeriod := defaultLimits.RetentionPeriod | ||
for _, streamRetention := range defaultLimits.StreamRetention { | ||
cyriltovena marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if streamRetention.Period < smallestRetentionPeriod { | ||
smallestRetentionPeriod = streamRetention.Period | ||
if streamRetention.Period < smallestDefaultRetentionPeriod { | ||
smallestDefaultRetentionPeriod = streamRetention.Period | ||
} | ||
} | ||
|
||
overallSmallestRetentionPeriod := smallestDefaultRetentionPeriod | ||
|
||
// find the smallest retention period by user | ||
limitsByUserID := limits.AllByUserID() | ||
smallestRetentionPeriodByUser := make(map[string]model.Time, len(limitsByUserID)) | ||
|
@@ -141,12 +156,12 @@ func findLatestRetentionStartTime(now model.Time, limits Limits) (model.Time, ma | |
} | ||
} | ||
|
||
// update the common smallestRetentionPeriod if this user has smaller value | ||
// update the overallSmallestRetentionPeriod if this user has smaller value | ||
smallestRetentionPeriodByUser[userID] = now.Add(time.Duration(-smallestRetentionPeriodForUser)) | ||
if smallestRetentionPeriodForUser < smallestRetentionPeriod { | ||
smallestRetentionPeriod = smallestRetentionPeriodForUser | ||
if smallestRetentionPeriodForUser < overallSmallestRetentionPeriod { | ||
overallSmallestRetentionPeriod = smallestRetentionPeriodForUser | ||
} | ||
} | ||
|
||
return now.Add(time.Duration(-smallestRetentionPeriod)), smallestRetentionPeriodByUser | ||
return now.Add(time.Duration(-overallSmallestRetentionPeriod)), now.Add(time.Duration(-smallestDefaultRetentionPeriod)), smallestRetentionPeriodByUser | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.