From dcfef7d755c37f864e2c8fcdc86941553d042f84 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Sun, 2 Nov 2014 00:08:37 +0800 Subject: [PATCH 1/2] while fetchFailed's reason is connectionException, retry to fetch blocks's result --- .../network/nio/NioBlockTransferService.scala | 41 ++++++++++++++++++- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala index 11793ea92adb1..7d246ee4ffe90 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala @@ -18,6 +18,7 @@ package org.apache.spark.network.nio import java.nio.ByteBuffer +import java.io.IOException import org.apache.spark.network._ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} @@ -25,6 +26,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.util.Utils import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} +import scala.collection.mutable.HashMap import scala.concurrent.Future @@ -39,6 +41,10 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa private var blockDataManager: BlockDataManager = _ + private val blockFailedCounts = new HashMap[Seq[String], Int] + + val maxRetryNum = conf.getInt("spark.shuffle.fetch.maxRetryNumber", 3) + /** * Port number the service is listening on, available only after [[init]] is invoked. */ @@ -96,6 +102,11 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa future.onSuccess { case message => val bufferMessage = message.asInstanceOf[BufferMessage] val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) + blockFailedCounts.synchronized { + if(blockFailedCounts.contains(blockIds)){ + blockFailedCounts -= blockIds + } + } // SPARK-4064: In some cases(eg. Remote block was removed) blockMessageArray may be empty. if (blockMessageArray.isEmpty) { @@ -121,8 +132,34 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa }(cm.futureExecContext) future.onFailure { case exception => - blockIds.foreach { blockId => - listener.onBlockFetchFailure(blockId, exception) + exception match { + case connectExcpt: IOException => + logWarning("Failed to connect to " + hostName + ":" + port); + var isRetry:Boolean = false + var failedCount:Int = 1 + blockFailedCounts.synchronized { + if(blockFailedCounts.contains(blockIds)){ + failedCount = blockFailedCounts(blockIds) + failedCount += 1 + } + if(failedCount >= maxRetryNum){ + isRetry = false + }else{ + isRetry = true + blockFailedCounts += ((blockIds, failedCount)) + } + } + if(isRetry){ + fetchBlocks(hostName, port, blockIds, listener) + }else{ + blockIds.foreach { blockId => + listener.onBlockFetchFailure(blockId, connectExcpt) + } + } + case t: Throwable => + blockIds.foreach { blockId => + listener.onBlockFetchFailure(blockId, t) + } } }(cm.futureExecContext) } From 5a73b043327107f879cfefcff302422f135433dc Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Wed, 24 Dec 2014 11:55:59 +0800 Subject: [PATCH 2/2] address JoshRosen's comments --- .../network/nio/NioBlockTransferService.scala | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala index 7d246ee4ffe90..e3403c867e936 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala @@ -103,9 +103,7 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa val bufferMessage = message.asInstanceOf[BufferMessage] val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) blockFailedCounts.synchronized { - if(blockFailedCounts.contains(blockIds)){ - blockFailedCounts -= blockIds - } + blockFailedCounts -= blockIds } // SPARK-4064: In some cases(eg. Remote block was removed) blockMessageArray may be empty. @@ -134,27 +132,21 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa future.onFailure { case exception => exception match { case connectExcpt: IOException => - logWarning("Failed to connect to " + hostName + ":" + port); - var isRetry:Boolean = false - var failedCount:Int = 1 - blockFailedCounts.synchronized { - if(blockFailedCounts.contains(blockIds)){ - failedCount = blockFailedCounts(blockIds) - failedCount += 1 - } - if(failedCount >= maxRetryNum){ - isRetry = false - }else{ - isRetry = true - blockFailedCounts += ((blockIds, failedCount)) - } + logWarning("Failed to connect to " + hostName + ":" + port) + val failedCount = blockFailedCounts.synchronized { + val newFailedCount = blockFailedCounts(blockIds).getOrElse(0) + 1 + blockFailedCounts(blockIds) = newFailedCount + newFailedCount } - if(isRetry){ - fetchBlocks(hostName, port, blockIds, listener) - }else{ + if (failedCount >= maxRetryNum) { + blockFailedCounts.synchronized { + blockFailedCounts -= blockIds + } blockIds.foreach { blockId => listener.onBlockFetchFailure(blockId, connectExcpt) } + } else { + fetchBlocks(hostName, port, blockIds, listener) } case t: Throwable => blockIds.foreach { blockId =>