From fc5245d8c37a6c9d585c5792940a8f9501bedbe1 Mon Sep 17 00:00:00 2001 From: Kowshik Prakasam Date: Wed, 16 Jun 2021 15:02:50 -0700 Subject: [PATCH] KAFKA-12955: Fix LogLoader to pass materialized view of segments for deletion (#10888) Within LogLoader.removeAndDeleteSegmentsAsync(), we should force materialization of the segmentsToDelete iterable, to make sure the results of the iteration remain valid and deterministic. We should also pass only the materialized view to the logic that deletes the segments, as otherwise we could end up deleting the wrong segments. Reviewers: Jun Rao --- core/src/main/scala/kafka/log/Log.scala | 11 ++--- core/src/main/scala/kafka/log/LogLoader.scala | 11 ++--- .../scala/unit/kafka/log/LogLoaderTest.scala | 41 +++++++++++++++++++ .../test/scala/unit/kafka/log/LogTest.scala | 33 --------------- 4 files changed, 53 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b49bfb46dbdff..a253c17bd0a5b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -45,7 +45,7 @@ import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPar import scala.jdk.CollectionConverters._ import scala.collection.mutable.{ArrayBuffer, ListBuffer} -import scala.collection.{Seq, mutable} +import scala.collection.{Seq, immutable, mutable} object LogAppendInfo { val UnknownLogAppendInfo = LogAppendInfo(None, -1, None, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, @@ -1879,9 +1879,10 @@ class Log(@volatile private var _dir: File, reason: SegmentDeletionReason): Unit = { if (segments.nonEmpty) { lock synchronized { - // As most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by + // Most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by // removing the deleted segment, we should force materialization of the iterator here, so that results of the - // iteration remain valid and deterministic. + // iteration remain valid and deterministic. We should also pass only the materialized view of the + // iterator to the logic that actually deletes the segments. val toDelete = segments.toList reason.logReason(this, toDelete) toDelete.foreach { segment => @@ -1892,7 +1893,7 @@ class Log(@volatile private var _dir: File, } } - private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = { + private def deleteSegmentFiles(segments: immutable.Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = { Log.deleteSegmentFiles(segments, asyncDelete, deleteProducerStateSnapshots, dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager, this.logIdent) } @@ -2378,7 +2379,7 @@ object Log extends Logging { * @param logPrefix The logging prefix * @throws IOException if the file can't be renamed and still exists */ - private[log] def deleteSegmentFiles(segmentsToDelete: Iterable[LogSegment], + private[log] def deleteSegmentFiles(segmentsToDelete: immutable.Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true, dir: File, diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala index 0a9222e70386f..bde34e03b2d3c 100644 --- a/core/src/main/scala/kafka/log/LogLoader.scala +++ b/core/src/main/scala/kafka/log/LogLoader.scala @@ -510,17 +510,18 @@ object LogLoader extends Logging { private def removeAndDeleteSegmentsAsync(segmentsToDelete: Iterable[LogSegment], params: LoadLogParams): Unit = { if (segmentsToDelete.nonEmpty) { - // As most callers hold an iterator into the `params.segments` collection and - // `removeAndDeleteSegmentAsync` mutates it by removing the deleted segment, we should force - // materialization of the iterator here, so that results of the iteration remain valid and - // deterministic. + // Most callers hold an iterator into the `params.segments` collection and + // `removeAndDeleteSegmentAsync` mutates it by removing the deleted segment. Therefore, + // we should force materialization of the iterator here, so that results of the iteration + // remain valid and deterministic. We should also pass only the materialized view of the + // iterator to the logic that deletes the segments. val toDelete = segmentsToDelete.toList info(s"${params.logIdentifier}Deleting segments as part of log recovery: ${toDelete.mkString(",")}") toDelete.foreach { segment => params.segments.remove(segment.baseOffset) } Log.deleteSegmentFiles( - segmentsToDelete, + toDelete, asyncDelete = true, deleteProducerStateSnapshots = true, params.dir, diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 9a7b627492472..1dea5d054f33a 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -1494,4 +1494,45 @@ class LogLoaderTest { val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog) assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) } + + @Test + def testLogEndLessThanStartAfterReopen(): Unit = { + val logConfig = LogTestUtils.createLogConfig() + var log = createLog(logDir, logConfig) + for (i <- 0 until 5) { + val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) + log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) + log.roll() + } + assertEquals(6, log.logSegments.size) + + // Increment the log start offset + val startOffset = 4 + log.updateHighWatermark(log.logEndOffset) + log.maybeIncrementLogStartOffset(startOffset, ClientRecordDeletion) + assertTrue(log.logEndOffset > log.logStartOffset) + + // Append garbage to a segment below the current log start offset + val segmentToForceTruncation = log.logSegments.take(2).last + val bw = new BufferedWriter(new FileWriter(segmentToForceTruncation.log.file)) + bw.write("corruptRecord") + bw.close() + log.close() + + // Reopen the log. This will cause truncate the segment to which we appended garbage and delete all other segments. + // All remaining segments will be lower than the current log start offset, which will force deletion of all segments + // and recreation of a single, active segment starting at logStartOffset. + log = createLog(logDir, logConfig, logStartOffset = startOffset, lastShutdownClean = false) + // Wait for segment deletions (if any) to complete. + mockTime.sleep(logConfig.fileDeleteDelayMs) + assertEquals(1, log.numberOfSegments) + assertEquals(startOffset, log.logStartOffset) + assertEquals(startOffset, log.logEndOffset) + // Validate that the remaining segment matches our expectations + val onlySegment = log.segments.firstSegment.get + assertEquals(startOffset, onlySegment.baseOffset) + assertTrue(onlySegment.log.file().exists()) + assertTrue(onlySegment.lazyOffsetIndex.file.exists()) + assertTrue(onlySegment.lazyTimeIndex.file.exists()) + } } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 4cc88d0f5873c..0113db1b152d4 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -747,39 +747,6 @@ class LogTest { assertEquals(500, log.logEndOffset) } - @Test - def testLogEndLessThanStartAfterReopen(): Unit = { - val logConfig = LogTestUtils.createLogConfig() - var log = createLog(logDir, logConfig) - for (i <- 0 until 5) { - val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) - log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) - log.roll() - } - assertEquals(6, log.logSegments.size) - - // Increment the log start offset - val startOffset = 4 - log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(startOffset, ClientRecordDeletion) - assertTrue(log.logEndOffset > log.logStartOffset) - - // Append garbage to a segment below the current log start offset - val segmentToForceTruncation = log.logSegments.take(2).last - val bw = new BufferedWriter(new FileWriter(segmentToForceTruncation.log.file)) - bw.write("corruptRecord") - bw.close() - log.close() - - // Reopen the log. This will cause truncate the segment to which we appended garbage and delete all other segments. - // All remaining segments will be lower than the current log start offset, which will force deletion of all segments - // and recreation of a single, active segment starting at logStartOffset. - log = createLog(logDir, logConfig, logStartOffset = startOffset, lastShutdownClean = false) - assertEquals(1, log.logSegments.size) - assertEquals(startOffset, log.logStartOffset) - assertEquals(startOffset, log.logEndOffset) - } - @Test def testNonActiveSegmentsFrom(): Unit = { val logConfig = LogTestUtils.createLogConfig()