Skip to content

Commit

Permalink
KAFKA-12177: apply log start offset retention before time and size ba…
Browse files Browse the repository at this point in the history
…sed retention (#10216)

Log start offset retention is the cheapest retention to evaluate and does not require access to maxTimestamp fields for segments, nor segment sizes. In addition, it may unblock other types of retention such as time based retention. Without this change retention is not idempotent. It's possible for one deleteOldSegments call to delete segments due to log start offset retention, and a follow up call to delete due to time based retention, even if the time has not changed.

Reviewers: Jun Rao <junrao@gmail.com>
  • Loading branch information
lbradstreet authored Mar 2, 2021
1 parent 3708a7c commit 36d6165
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1889,7 +1889,9 @@ class Log(@volatile private var _dir: File,
*/
def deleteOldSegments(): Int = {
if (config.delete) {
deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
deleteLogStartOffsetBreachedSegments() +
deleteRetentionSizeBreachedSegments() +
deleteRetentionMsBreachedSegments()
} else {
deleteLogStartOffsetBreachedSegments()
}
Expand Down
21 changes: 21 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,27 @@ class LogTest {
"expect a single producer state snapshot remaining")
}

@Test
def testRetentionIdempotency(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = -1, retentionMs = 900, fileDeleteDelayMs = 0)
val log = createLog(logDir, logConfig)

log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds() + 100, "a".getBytes))), leaderEpoch = 0)
log.roll()
log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "b".getBytes))), leaderEpoch = 0)
log.roll()
log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds() + 100, "c".getBytes))), leaderEpoch = 0)

mockTime.sleep(901)

log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(1L, ClientRecordDeletion)
assertEquals(2, log.deleteOldSegments(),
"Expecting two segment deletions as log start offset retention should unblock time based retention")
assertEquals(0, log.deleteOldSegments())
}


@Test
def testLogStartOffsetMovementDeletesSnapshots(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = -1, fileDeleteDelayMs = 0)
Expand Down

0 comments on commit 36d6165

Please sign in to comment.