Skip to content

Commit

Permalink
address JoshRosen's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lianhuiwang committed Dec 24, 2014
1 parent dcfef7d commit 5a73b04
Showing 1 changed file with 12 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
val bufferMessage = message.asInstanceOf[BufferMessage]
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
blockFailedCounts.synchronized {
if(blockFailedCounts.contains(blockIds)){
blockFailedCounts -= blockIds
}
blockFailedCounts -= blockIds
}

// SPARK-4064: In some cases(eg. Remote block was removed) blockMessageArray may be empty.
Expand Down Expand Up @@ -134,27 +132,21 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
future.onFailure { case exception =>
exception match {
case connectExcpt: IOException =>
logWarning("Failed to connect to " + hostName + ":" + port);
var isRetry:Boolean = false
var failedCount:Int = 1
blockFailedCounts.synchronized {
if(blockFailedCounts.contains(blockIds)){
failedCount = blockFailedCounts(blockIds)
failedCount += 1
}
if(failedCount >= maxRetryNum){
isRetry = false
}else{
isRetry = true
blockFailedCounts += ((blockIds, failedCount))
}
logWarning("Failed to connect to " + hostName + ":" + port)
val failedCount = blockFailedCounts.synchronized {
val newFailedCount = blockFailedCounts(blockIds).getOrElse(0) + 1
blockFailedCounts(blockIds) = newFailedCount
newFailedCount
}
if(isRetry){
fetchBlocks(hostName, port, blockIds, listener)
}else{
if (failedCount >= maxRetryNum) {
blockFailedCounts.synchronized {
blockFailedCounts -= blockIds
}
blockIds.foreach { blockId =>
listener.onBlockFetchFailure(blockId, connectExcpt)
}
} else {
fetchBlocks(hostName, port, blockIds, listener)
}
case t: Throwable =>
blockIds.foreach { blockId =>
Expand Down

0 comments on commit 5a73b04

Please sign in to comment.