diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 2249b5eccbbe3..33a416ef3c5d9 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1864,7 +1864,9 @@ class Log(@volatile private var _dir: File, */ def deleteOldSegments(): Int = { if (config.delete) { - deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments() + deleteLogStartOffsetBreachedSegments() + + deleteRetentionSizeBreachedSegments() + + deleteRetentionMsBreachedSegments() } else { deleteLogStartOffsetBreachedSegments() } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1a953c53d5d9b..9fdede45249f8 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -1515,6 +1515,27 @@ class LogTest { "expect a single producer state snapshot remaining") } + @Test + def testRetentionIdempotency(): Unit = { + val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = -1, retentionMs = 900, fileDeleteDelayMs = 0) + val log = createLog(logDir, logConfig) + + log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds() + 100, "a".getBytes))), leaderEpoch = 0) + log.roll() + log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "b".getBytes))), leaderEpoch = 0) + log.roll() + log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds() + 100, "c".getBytes))), leaderEpoch = 0) + + mockTime.sleep(901) + + log.updateHighWatermark(log.logEndOffset) + log.maybeIncrementLogStartOffset(1L, ClientRecordDeletion) + assertEquals(2, log.deleteOldSegments(), + "Expecting two segment deletions as log start offset retention should unblock time based retention") + assertEquals(0, log.deleteOldSegments()) + } + + @Test def testLogStartOffsetMovementDeletesSnapshots(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = -1, fileDeleteDelayMs = 0)