Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10723: Fix LogManager shutdown error handling #9596

Merged
merged 3 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -477,27 +477,41 @@ class LogManager(logDirs: Seq[File],
jobs(dir) = jobsForDir.map(pool.submit).toSeq
}

var firstExceptionOpt: Option[Throwable] = Option.empty
try {
for ((dir, dirJobs) <- jobs) {
dirJobs.foreach(_.get)
val errorsForDirJobs = dirJobs.map {
future =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this should be in the previous line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use scala.util.Try to wrap the call and get a Success or Failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done.

future.get
Option.empty
} catch {
case e: ExecutionException =>
error(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}")
Some(e.getCause)
}
}.filter{ e => e.isDefined }.map{ e => e.get }

if (firstExceptionOpt.isEmpty) {
firstExceptionOpt = errorsForDirJobs.headOption
}

val logs = logsInDir(localLogsByDir, dir)
if (errorsForDirJobs.isEmpty) {
val logs = logsInDir(localLogsByDir, dir)

// update the last flush point
debug(s"Updating recovery points at $dir")
checkpointRecoveryOffsetsInDir(dir, logs)
// update the last flush point
debug(s"Updating recovery points at $dir")
checkpointRecoveryOffsetsInDir(dir, logs)

debug(s"Updating log start offsets at $dir")
checkpointLogStartOffsetsInDir(dir, logs)
debug(s"Updating log start offsets at $dir")
checkpointLogStartOffsetsInDir(dir, logs)

// mark that the shutdown was clean by creating marker file
debug(s"Writing clean shutdown marker at $dir")
CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this)
// mark that the shutdown was clean by creating marker file
debug(s"Writing clean shutdown marker at $dir")
CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this)
}
}
} catch {
case e: ExecutionException =>
error(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}")
throw e.getCause
firstExceptionOpt.foreach{ e => throw e}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, since we are about to shut down the JVM, should we just log a WARN here instead of throwing the exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point. I've changed the code to do the same.
My understanding is that the exception swallow safety net exists inside KafkaServer.shutdown() today, but it makes sense to also just log a warning here instead instead of relying on the safety net:

CoreUtils.swallow(logManager.shutdown(), this)
.

} finally {
threadPools.foreach(_.shutdown())
// regardless of whether the close succeeded, we need to unlock the data directories
Expand Down
53 changes: 51 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,24 @@
package kafka.log

import java.io._
import java.nio.file.Files
import java.util.{Collections, Properties}

import com.yammer.metrics.core.MetricName
import kafka.metrics.KafkaYammerMetrics
import kafka.server.{FetchDataInfo, FetchLogEnd}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
import org.apache.kafka.common.errors.OffsetOutOfRangeException
import org.apache.directory.api.util.FileUtils
import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{doAnswer, spy}
import org.scalatest.Assertions.assertThrows

import scala.collection.mutable
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -67,7 +70,8 @@ class LogManagerTest {
logManager.shutdown()
Utils.delete(logDir)
// Some tests assign a new LogManager
logManager.liveLogDirs.foreach(Utils.delete)
if (logManager != null)
logManager.liveLogDirs.foreach(Utils.delete)
}

/**
Expand All @@ -83,6 +87,51 @@ class LogManagerTest {
log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
}

/**
* Tests that all internal futures are completed before LogManager.shutdown() returns to the
* caller during error situations.
*/
@Test
def testHandlingExceptionsDuringShutdown(): Unit = {
logManager.shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, do we need this given that we do this in tearDown() already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this explicit shutdown is needed to:

  1. Re-create a new LogManager instance with multiple logDirs for this test. This is different from the default one provided in setUp().
  2. Help do some additional checks post shutdown (towards the end of this test).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it again, you are right. I have eliminated the need for the shutdown() now by using a LogManager instance specific to the test.


// We create two directories logDir1 and logDir2 to help effectively test error handling
// during LogManager.shutdown().
val logDir1 = TestUtils.tempDir()
val logDir2 = TestUtils.tempDir()
logManager = createLogManager(Seq(logDir1, logDir2))
assertEquals(2, logManager.liveLogDirs.size)
logManager.startup()

val log1 = logManager.getOrCreateLog(new TopicPartition(name, 0), () => logConfig)
val log2 = logManager.getOrCreateLog(new TopicPartition(name, 1), () => logConfig)

val logFile1 = new File(logDir1, name + "-0")
assertTrue(logFile1.exists)
val logFile2 = new File(logDir2, name + "-1")
assertTrue(logFile2.exists)

log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
log1.takeProducerSnapshot()
log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)

log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)
log2.takeProducerSnapshot()
log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)

// This should cause log1.close() to fail during LogManger shutdown sequence.
FileUtils.deleteDirectory(logFile1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the end user delete the log files Manually , the server cannot be stopped. and The cannot startup it again? so in this case ,how do they resolve it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I do not understand the question.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if error occur during the shutdown of the broker ? should we log the error info to the log or just throw the exception ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on the kind of error, but we do log the error information to the log today from within KafkaServer.shutdown().


assertThrows[KafkaStorageException] {
logManager.shutdown()
}
assertFalse(Files.exists(new File(logDir1, Log.CleanShutdownFile).toPath))
assertTrue(Files.exists(new File(logDir2, Log.CleanShutdownFile).toPath))

logManager.liveLogDirs.foreach(Utils.delete)
logManager = null
}

/**
* Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log.
* The LogManager is configured with one invalid log directory which should be marked as offline.
Expand Down