Skip to content

Commit

Permalink
KAFKA-12955: Fix LogLoader to pass materialized view of segments for …
Browse files Browse the repository at this point in the history
…deletion (#10888)

Within LogLoader.removeAndDeleteSegmentsAsync(), we should force materialization of the segmentsToDelete iterable, to make sure the results of the iteration remain valid and deterministic. We should also pass only the materialized view to the logic that deletes the segments, as otherwise we could end up deleting the wrong segments.

Reviewers: Jun Rao <junrao@gmail.com>
  • Loading branch information
kowshik authored Jun 16, 2021
1 parent 96767a6 commit fc5245d
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 43 deletions.
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPar

import scala.jdk.CollectionConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.{Seq, mutable}
import scala.collection.{Seq, immutable, mutable}

object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(None, -1, None, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
Expand Down Expand Up @@ -1879,9 +1879,10 @@ class Log(@volatile private var _dir: File,
reason: SegmentDeletionReason): Unit = {
if (segments.nonEmpty) {
lock synchronized {
// As most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
// Most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
// removing the deleted segment, we should force materialization of the iterator here, so that results of the
// iteration remain valid and deterministic.
// iteration remain valid and deterministic. We should also pass only the materialized view of the
// iterator to the logic that actually deletes the segments.
val toDelete = segments.toList
reason.logReason(this, toDelete)
toDelete.foreach { segment =>
Expand All @@ -1892,7 +1893,7 @@ class Log(@volatile private var _dir: File,
}
}

private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = {
private def deleteSegmentFiles(segments: immutable.Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = {
Log.deleteSegmentFiles(segments, asyncDelete, deleteProducerStateSnapshots, dir, topicPartition,
config, scheduler, logDirFailureChannel, producerStateManager, this.logIdent)
}
Expand Down Expand Up @@ -2378,7 +2379,7 @@ object Log extends Logging {
* @param logPrefix The logging prefix
* @throws IOException if the file can't be renamed and still exists
*/
private[log] def deleteSegmentFiles(segmentsToDelete: Iterable[LogSegment],
private[log] def deleteSegmentFiles(segmentsToDelete: immutable.Iterable[LogSegment],
asyncDelete: Boolean,
deleteProducerStateSnapshots: Boolean = true,
dir: File,
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/log/LogLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -510,17 +510,18 @@ object LogLoader extends Logging {
private def removeAndDeleteSegmentsAsync(segmentsToDelete: Iterable[LogSegment],
params: LoadLogParams): Unit = {
if (segmentsToDelete.nonEmpty) {
// As most callers hold an iterator into the `params.segments` collection and
// `removeAndDeleteSegmentAsync` mutates it by removing the deleted segment, we should force
// materialization of the iterator here, so that results of the iteration remain valid and
// deterministic.
// Most callers hold an iterator into the `params.segments` collection and
// `removeAndDeleteSegmentAsync` mutates it by removing the deleted segment. Therefore,
// we should force materialization of the iterator here, so that results of the iteration
// remain valid and deterministic. We should also pass only the materialized view of the
// iterator to the logic that deletes the segments.
val toDelete = segmentsToDelete.toList
info(s"${params.logIdentifier}Deleting segments as part of log recovery: ${toDelete.mkString(",")}")
toDelete.foreach { segment =>
params.segments.remove(segment.baseOffset)
}
Log.deleteSegmentFiles(
segmentsToDelete,
toDelete,
asyncDelete = true,
deleteProducerStateSnapshots = true,
params.dir,
Expand Down
41 changes: 41 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1494,4 +1494,45 @@ class LogLoaderTest {
val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog)
assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
}

@Test
def testLogEndLessThanStartAfterReopen(): Unit = {
val logConfig = LogTestUtils.createLogConfig()
var log = createLog(logDir, logConfig)
for (i <- 0 until 5) {
val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
log.roll()
}
assertEquals(6, log.logSegments.size)

// Increment the log start offset
val startOffset = 4
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(startOffset, ClientRecordDeletion)
assertTrue(log.logEndOffset > log.logStartOffset)

// Append garbage to a segment below the current log start offset
val segmentToForceTruncation = log.logSegments.take(2).last
val bw = new BufferedWriter(new FileWriter(segmentToForceTruncation.log.file))
bw.write("corruptRecord")
bw.close()
log.close()

// Reopen the log. This will cause truncate the segment to which we appended garbage and delete all other segments.
// All remaining segments will be lower than the current log start offset, which will force deletion of all segments
// and recreation of a single, active segment starting at logStartOffset.
log = createLog(logDir, logConfig, logStartOffset = startOffset, lastShutdownClean = false)
// Wait for segment deletions (if any) to complete.
mockTime.sleep(logConfig.fileDeleteDelayMs)
assertEquals(1, log.numberOfSegments)
assertEquals(startOffset, log.logStartOffset)
assertEquals(startOffset, log.logEndOffset)
// Validate that the remaining segment matches our expectations
val onlySegment = log.segments.firstSegment.get
assertEquals(startOffset, onlySegment.baseOffset)
assertTrue(onlySegment.log.file().exists())
assertTrue(onlySegment.lazyOffsetIndex.file.exists())
assertTrue(onlySegment.lazyTimeIndex.file.exists())
}
}
33 changes: 0 additions & 33 deletions core/src/test/scala/unit/kafka/log/LogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -747,39 +747,6 @@ class LogTest {
assertEquals(500, log.logEndOffset)
}

@Test
def testLogEndLessThanStartAfterReopen(): Unit = {
val logConfig = LogTestUtils.createLogConfig()
var log = createLog(logDir, logConfig)
for (i <- 0 until 5) {
val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
log.roll()
}
assertEquals(6, log.logSegments.size)

// Increment the log start offset
val startOffset = 4
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(startOffset, ClientRecordDeletion)
assertTrue(log.logEndOffset > log.logStartOffset)

// Append garbage to a segment below the current log start offset
val segmentToForceTruncation = log.logSegments.take(2).last
val bw = new BufferedWriter(new FileWriter(segmentToForceTruncation.log.file))
bw.write("corruptRecord")
bw.close()
log.close()

// Reopen the log. This will cause truncate the segment to which we appended garbage and delete all other segments.
// All remaining segments will be lower than the current log start offset, which will force deletion of all segments
// and recreation of a single, active segment starting at logStartOffset.
log = createLog(logDir, logConfig, logStartOffset = startOffset, lastShutdownClean = false)
assertEquals(1, log.logSegments.size)
assertEquals(startOffset, log.logStartOffset)
assertEquals(startOffset, log.logEndOffset)
}

@Test
def testNonActiveSegmentsFrom(): Unit = {
val logConfig = LogTestUtils.createLogConfig()
Expand Down

0 comments on commit fc5245d

Please sign in to comment.