Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4195][Core]retry to fetch blocks's result when fetchfailed's reason is connection timeout #3061

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.spark.network.nio

import java.nio.ByteBuffer
import java.io.IOException

import org.apache.spark.network._
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}

import scala.collection.mutable.HashMap
import scala.concurrent.Future


Expand All @@ -39,6 +41,10 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa

private var blockDataManager: BlockDataManager = _

private val blockFailedCounts = new HashMap[Seq[String], Int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid memory leaks, we need to be sure that this won't grow without bound. Let me try to walk through the cases...

  • If no errors occur, this will remain empty since entries are only added on error.
  • If a fetch fails and a retry succeeds, then the entry is removed from this map.
  • If the maximum number of attempts is exceeded, we don't remove an entry from this map.

So, looks like this adds a memory leak?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, thank you for finding its error.


val maxRetryNum = conf.getInt("spark.shuffle.fetch.maxRetryNumber", 3)

/**
* Port number the service is listening on, available only after [[init]] is invoked.
*/
Expand Down Expand Up @@ -96,6 +102,9 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
future.onSuccess { case message =>
val bufferMessage = message.asInstanceOf[BufferMessage]
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
blockFailedCounts.synchronized {
blockFailedCounts -= blockIds
}

// SPARK-4064: In some cases(eg. Remote block was removed) blockMessageArray may be empty.
if (blockMessageArray.isEmpty) {
Expand All @@ -121,8 +130,28 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
}(cm.futureExecContext)

future.onFailure { case exception =>
blockIds.foreach { blockId =>
listener.onBlockFetchFailure(blockId, exception)
exception match {
case connectExcpt: IOException =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not catch ConnectException? I suppose it's always safe to retry as long as the number of retries is bounded, but it's probably better to catch the narrower exception if we're only trying to deal with connection establishment errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because in https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala#L963 doesnot catch ConnectException and throw a IOException. so now in here we only catch IOException. if we are only trying to deal with connection errors, it need to catch ConnectException in ConnectionManager.scala.

logWarning("Failed to connect to " + hostName + ":" + port)
val failedCount = blockFailedCounts.synchronized {
val newFailedCount = blockFailedCounts(blockIds).getOrElse(0) + 1
blockFailedCounts(blockIds) = newFailedCount
newFailedCount
}
if (failedCount >= maxRetryNum) {
blockFailedCounts.synchronized {
blockFailedCounts -= blockIds
}
blockIds.foreach { blockId =>
listener.onBlockFetchFailure(blockId, connectExcpt)
}
} else {
fetchBlocks(hostName, port, blockIds, listener)
}
case t: Throwable =>
blockIds.foreach { blockId =>
listener.onBlockFetchFailure(blockId, t)
}
}
}(cm.futureExecContext)
}
Expand Down