Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10723: Fix LogManager shutdown error handling #9596

Merged
merged 3 commits into from
Nov 19, 2020

Conversation

kowshik
Copy link
Contributor

@kowshik kowshik commented Nov 15, 2020

The asynchronous shutdown in LogManager has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before LogManager.shutdown returns. This is because, this line in the finally clause shuts down the thread pools asynchronously. As a result, despite the shut down completed message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close some logs. This is misleading during debugging. Also sometimes it introduces an avoidable post-shutdown activity where resources (such as file handles) are released or persistent state is checkpointed in the Broker.

In this PR, we fix the above behavior such that we prevent leakage of threads. If any of the futures throw an error, we skip creating of checkpoint and clean shutdown file only for the affected log directory. We continue to wait for all futures to complete for all the directories.

Test plan:

Added a new unit test: LogManager.testHandlingExceptionsDuringShutdown.

@kowshik kowshik force-pushed the KAFKA-10723_LogManager_shutdown_fix branch from 89243a3 to 3a36b54 Compare November 15, 2020 01:59
@kowshik
Copy link
Contributor Author

kowshik commented Nov 15, 2020

cc @dhruvilshah3 @junrao @hachikuji for review

log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)

// This should cause log1.close() to fail during LogManger shutdown sequence.
FileUtils.deleteDirectory(logFile1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the end user delete the log files Manually , the server cannot be stopped. and The cannot startup it again? so in this case ,how do they resolve it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I do not understand the question.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if error occur during the shutdown of the broker ? should we log the error info to the log or just throw the exception ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on the kind of error, but we do log the error information to the log today from within KafkaServer.shutdown().

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kowshik : Thanks for the PR. A couple of comments below.

case e: ExecutionException =>
error(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}")
throw e.getCause
firstExceptionOpt.foreach{ e => throw e}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, since we are about to shut down the JVM, should we just log a WARN here instead of throwing the exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point. I've changed the code to do the same.
My understanding is that the exception swallow safety net exists inside KafkaServer.shutdown() today, but it makes sense to also just log a warning here instead instead of relying on the safety net:

CoreUtils.swallow(logManager.shutdown(), this)
.

*/
@Test
def testHandlingExceptionsDuringShutdown(): Unit = {
logManager.shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, do we need this given that we do this in tearDown() already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this explicit shutdown is needed to:

  1. Re-create a new LogManager instance with multiple logDirs for this test. This is different from the default one provided in setUp().
  2. Help do some additional checks post shutdown (towards the end of this test).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it again, you are right. I have eliminated the need for the shutdown() now by using a LogManager instance specific to the test.

@kowshik kowshik force-pushed the KAFKA-10723_LogManager_shutdown_fix branch from 71cbf59 to f917f0c Compare November 17, 2020 02:52
@kowshik kowshik requested a review from junrao November 17, 2020 03:00
@kowshik
Copy link
Contributor Author

kowshik commented Nov 17, 2020

Thanks for the review @junrao! I have addressed the comments in f917f0c.

@@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File],

try {
for ((dir, dirJobs) <- jobs) {
dirJobs.foreach(_.get)
val hasErrors = dirJobs.exists {
future =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this should be in the previous line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File],

try {
for ((dir, dirJobs) <- jobs) {
dirJobs.foreach(_.get)
val hasErrors = dirJobs.exists {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks wrong. exists short-circuits. I think you want map followed by exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a really good point. Done.

dirJobs.foreach(_.get)
val hasErrors = dirJobs.exists {
future =>
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use scala.util.Try to wrap the call and get a Success or Failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done.

@kowshik
Copy link
Contributor Author

kowshik commented Nov 18, 2020

Thanks for the review @ijuma ! I have addressed the comments in 8716429 .

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kowshik : Thanks for the latest PR. LGTM

@junrao junrao merged commit dcbd28d into apache:trunk Nov 19, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants