Skip to content

Commit

Permalink
KAFKA-10723: Fix LogManager shutdown error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
kowshik committed Nov 15, 2020
1 parent cb3dc67 commit 3a36b54
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 16 deletions.
42 changes: 28 additions & 14 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -477,27 +477,41 @@ class LogManager(logDirs: Seq[File],
jobs(dir) = jobsForDir.map(pool.submit).toSeq
}

var firstExceptionOpt: Option[Throwable] = Option.empty
try {
for ((dir, dirJobs) <- jobs) {
dirJobs.foreach(_.get)
val errorsForDirJobs = dirJobs.map {
future =>
try {
future.get
Option.empty
} catch {
case e: ExecutionException =>
error(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}")
Some(e.getCause)
}
}.filter{ e => e.isDefined }.map{ e => e.get }

if (firstExceptionOpt.isEmpty) {
firstExceptionOpt = errorsForDirJobs.headOption
}

val logs = logsInDir(localLogsByDir, dir)
if (errorsForDirJobs.isEmpty) {
val logs = logsInDir(localLogsByDir, dir)

// update the last flush point
debug(s"Updating recovery points at $dir")
checkpointRecoveryOffsetsInDir(dir, logs)
// update the last flush point
debug(s"Updating recovery points at $dir")
checkpointRecoveryOffsetsInDir(dir, logs)

debug(s"Updating log start offsets at $dir")
checkpointLogStartOffsetsInDir(dir, logs)
debug(s"Updating log start offsets at $dir")
checkpointLogStartOffsetsInDir(dir, logs)

// mark that the shutdown was clean by creating marker file
debug(s"Writing clean shutdown marker at $dir")
CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this)
// mark that the shutdown was clean by creating marker file
debug(s"Writing clean shutdown marker at $dir")
CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this)
}
}
} catch {
case e: ExecutionException =>
error(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}")
throw e.getCause
firstExceptionOpt.foreach{ e => throw e}
} finally {
threadPools.foreach(_.shutdown())
// regardless of whether the close succeeded, we need to unlock the data directories
Expand Down
53 changes: 51 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,24 @@
package kafka.log

import java.io._
import java.nio.file.Files
import java.util.{Collections, Properties}

import com.yammer.metrics.core.MetricName
import kafka.metrics.KafkaYammerMetrics
import kafka.server.{FetchDataInfo, FetchLogEnd}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
import org.apache.kafka.common.errors.OffsetOutOfRangeException
import org.apache.directory.api.util.FileUtils
import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{doAnswer, spy}
import org.scalatest.Assertions.assertThrows

import scala.collection.mutable
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -67,7 +70,8 @@ class LogManagerTest {
logManager.shutdown()
Utils.delete(logDir)
// Some tests assign a new LogManager
logManager.liveLogDirs.foreach(Utils.delete)
if (logManager != null)
logManager.liveLogDirs.foreach(Utils.delete)
}

/**
Expand All @@ -83,6 +87,51 @@ class LogManagerTest {
log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
}

/**
* Tests that all internal futures are completed before LogManager.shutdown() returns to the
* caller during error situations.
*/
@Test
def testHandlingExceptionsDuringShutdown(): Unit = {
logManager.shutdown()

// We create two directories logDir1 and logDir2 to help effectively test error handling
// during LogManager.shutdown().
val logDir1 = TestUtils.tempDir()
val logDir2 = TestUtils.tempDir()
logManager = createLogManager(Seq(logDir1, logDir2))
assertEquals(2, logManager.liveLogDirs.size)
logManager.startup()

val log1 = logManager.getOrCreateLog(new TopicPartition(name, 0), () => logConfig)
val log2 = logManager.getOrCreateLog(new TopicPartition(name, 1), () => logConfig)

val logFile1 = new File(logDir1, name + "-0")
assertTrue(logFile1.exists)
val logFile2 = new File(logDir2, name + "-1")
assertTrue(logFile2.exists)

log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
log1.takeProducerSnapshot()
log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)

log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)
log2.takeProducerSnapshot()
log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)

// This should cause log1.close() to fail during LogManger shutdown sequence.
FileUtils.deleteDirectory(logFile1)

assertThrows[KafkaStorageException] {
logManager.shutdown()
}
assertFalse(Files.exists(new File(logDir1, Log.CleanShutdownFile).toPath))
assertTrue(Files.exists(new File(logDir2, Log.CleanShutdownFile).toPath))

logManager.liveLogDirs.foreach(Utils.delete)
logManager = null
}

/**
* Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log.
* The LogManager is configured with one invalid log directory which should be marked as offline.
Expand Down

0 comments on commit 3a36b54

Please sign in to comment.