Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10723: Fix LogManager shutdown error handling #9596

Merged
merged 3 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File],

try {
for ((dir, dirJobs) <- jobs) {
dirJobs.foreach(_.get)
val hasErrors = dirJobs.exists {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks wrong. exists short-circuits. I think you want map followed by exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a really good point. Done.

future =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this should be in the previous line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use scala.util.Try to wrap the call and get a Success or Failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done.

future.get
false
} catch {
case e: ExecutionException =>
warn(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}")
true
}
}

val logs = logsInDir(localLogsByDir, dir)
if (!hasErrors) {
val logs = logsInDir(localLogsByDir, dir)

// update the last flush point
debug(s"Updating recovery points at $dir")
checkpointRecoveryOffsetsInDir(dir, logs)
// update the last flush point
debug(s"Updating recovery points at $dir")
checkpointRecoveryOffsetsInDir(dir, logs)

debug(s"Updating log start offsets at $dir")
checkpointLogStartOffsetsInDir(dir, logs)
debug(s"Updating log start offsets at $dir")
checkpointLogStartOffsetsInDir(dir, logs)

// mark that the shutdown was clean by creating marker file
debug(s"Writing clean shutdown marker at $dir")
CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this)
// mark that the shutdown was clean by creating marker file
debug(s"Writing clean shutdown marker at $dir")
CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this)
}
}
} catch {
case e: ExecutionException =>
error(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}")
throw e.getCause
} finally {
threadPools.foreach(_.shutdown())
// regardless of whether the close succeeded, we need to unlock the data directories
Expand Down
50 changes: 49 additions & 1 deletion core/src/test/scala/unit/kafka/log/LogManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package kafka.log

import java.io._
import java.nio.file.Files
import java.util.{Collections, Properties}

import com.yammer.metrics.core.MetricName
import kafka.metrics.KafkaYammerMetrics
import kafka.server.{FetchDataInfo, FetchLogEnd}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
import org.apache.directory.api.util.FileUtils
import org.apache.kafka.common.errors.OffsetOutOfRangeException
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{KafkaException, TopicPartition}
Expand Down Expand Up @@ -67,7 +69,8 @@ class LogManagerTest {
logManager.shutdown()
Utils.delete(logDir)
// Some tests assign a new LogManager
logManager.liveLogDirs.foreach(Utils.delete)
if (logManager != null)
logManager.liveLogDirs.foreach(Utils.delete)
}

/**
Expand All @@ -83,6 +86,51 @@ class LogManagerTest {
log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
}

/**
* Tests that all internal futures are completed before LogManager.shutdown() returns to the
* caller during error situations.
*/
@Test
def testHandlingExceptionsDuringShutdown(): Unit = {
// We create two directories logDir1 and logDir2 to help effectively test error handling
// during LogManager.shutdown().
val logDir1 = TestUtils.tempDir()
val logDir2 = TestUtils.tempDir()
var logManagerForTest: Option[LogManager] = Option.empty
try {
logManagerForTest = Some(createLogManager(Seq(logDir1, logDir2)))

assertEquals(2, logManagerForTest.get.liveLogDirs.size)
logManagerForTest.get.startup()

val log1 = logManagerForTest.get.getOrCreateLog(new TopicPartition(name, 0), () => logConfig)
val log2 = logManagerForTest.get.getOrCreateLog(new TopicPartition(name, 1), () => logConfig)

val logFile1 = new File(logDir1, name + "-0")
assertTrue(logFile1.exists)
val logFile2 = new File(logDir2, name + "-1")
assertTrue(logFile2.exists)

log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
log1.takeProducerSnapshot()
log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)

log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)
log2.takeProducerSnapshot()
log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)

// This should cause log1.close() to fail during LogManger shutdown sequence.
FileUtils.deleteDirectory(logFile1)

logManagerForTest.get.shutdown()

assertFalse(Files.exists(new File(logDir1, Log.CleanShutdownFile).toPath))
assertTrue(Files.exists(new File(logDir2, Log.CleanShutdownFile).toPath))
} finally {
logManagerForTest.foreach(manager => manager.liveLogDirs.foreach(Utils.delete))
}
}

/**
* Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log.
* The LogManager is configured with one invalid log directory which should be marked as offline.
Expand Down