diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala index 7d246ee4ffe90..e3403c867e936 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala @@ -103,9 +103,7 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa val bufferMessage = message.asInstanceOf[BufferMessage] val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) blockFailedCounts.synchronized { - if(blockFailedCounts.contains(blockIds)){ - blockFailedCounts -= blockIds - } + blockFailedCounts -= blockIds } // SPARK-4064: In some cases(eg. Remote block was removed) blockMessageArray may be empty. @@ -134,27 +132,21 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa future.onFailure { case exception => exception match { case connectExcpt: IOException => - logWarning("Failed to connect to " + hostName + ":" + port); - var isRetry:Boolean = false - var failedCount:Int = 1 - blockFailedCounts.synchronized { - if(blockFailedCounts.contains(blockIds)){ - failedCount = blockFailedCounts(blockIds) - failedCount += 1 - } - if(failedCount >= maxRetryNum){ - isRetry = false - }else{ - isRetry = true - blockFailedCounts += ((blockIds, failedCount)) - } + logWarning("Failed to connect to " + hostName + ":" + port) + val failedCount = blockFailedCounts.synchronized { + val newFailedCount = blockFailedCounts(blockIds).getOrElse(0) + 1 + blockFailedCounts(blockIds) = newFailedCount + newFailedCount } - if(isRetry){ - fetchBlocks(hostName, port, blockIds, listener) - }else{ + if (failedCount >= maxRetryNum) { + blockFailedCounts.synchronized { + blockFailedCounts -= blockIds + } blockIds.foreach { blockId => listener.onBlockFetchFailure(blockId, connectExcpt) } + } else { + fetchBlocks(hostName, port, blockIds, listener) } case t: Throwable => blockIds.foreach { blockId =>