From 36d61650f4437a40ee599f2d36ad05570c4786f6 Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Wed, 3 Mar 2021 05:11:16 +0800 Subject: [PATCH] KAFKA-12177: apply log start offset retention before time and size based retention (#10216) Log start offset retention is the cheapest retention to evaluate and does not require access to maxTimestamp fields for segments, nor segment sizes. In addition, it may unblock other types of retention such as time based retention. Without this change retention is not idempotent. It's possible for one deleteOldSegments call to delete segments due to log start offset retention, and a follow up call to delete due to time based retention, even if the time has not changed. Reviewers: Jun Rao --- core/src/main/scala/kafka/log/Log.scala | 4 +++- .../test/scala/unit/kafka/log/LogTest.scala | 21 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index f9486bb3dbd97..5ab2fac3e75b2 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1889,7 +1889,9 @@ class Log(@volatile private var _dir: File, */ def deleteOldSegments(): Int = { if (config.delete) { - deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments() + deleteLogStartOffsetBreachedSegments() + + deleteRetentionSizeBreachedSegments() + + deleteRetentionMsBreachedSegments() } else { deleteLogStartOffsetBreachedSegments() } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 66ee5d2538c20..a76345fd730e5 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -1515,6 +1515,27 @@ class LogTest { "expect a single producer state snapshot remaining") } + @Test + def testRetentionIdempotency(): Unit = { + val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = -1, retentionMs = 900, fileDeleteDelayMs = 0) + val log = createLog(logDir, logConfig) + + log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds() + 100, "a".getBytes))), leaderEpoch = 0) + log.roll() + log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "b".getBytes))), leaderEpoch = 0) + log.roll() + log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds() + 100, "c".getBytes))), leaderEpoch = 0) + + mockTime.sleep(901) + + log.updateHighWatermark(log.logEndOffset) + log.maybeIncrementLogStartOffset(1L, ClientRecordDeletion) + assertEquals(2, log.deleteOldSegments(), + "Expecting two segment deletions as log start offset retention should unblock time based retention") + assertEquals(0, log.deleteOldSegments()) + } + + @Test def testLogStartOffsetMovementDeletesSnapshots(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = -1, fileDeleteDelayMs = 0)