Skip to content

Commit

Permalink
SPARK-3612. Executor shouldn't quit if heartbeat message fails to rea…
Browse files Browse the repository at this point in the history
…ch the driver
  • Loading branch information
sryza committed Sep 22, 2014
1 parent a48956f commit 2b7353d
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent._

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -375,12 +376,17 @@ private[spark] class Executor(
}

val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
retryAttempts, retryIntervalMs, timeout)
if (response.reregisterBlockManager) {
logWarning("Told to re-register on heartbeat")
env.blockManager.reregister()
try {
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
retryAttempts, retryIntervalMs, timeout)
if (response.reregisterBlockManager) {
logWarning("Told to re-register on heartbeat")
env.blockManager.reregister()
}
} catch {
case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t)
}

Thread.sleep(interval)
}
}
Expand Down

0 comments on commit 2b7353d

Please sign in to comment.