Skip to content

Commit

Permalink
[SPARK-12411] [CORE] Decrease executor heartbeat timeout to match hea…
Browse files Browse the repository at this point in the history
…rtbeat interval.

Previously, the rpc timeout was the default network timeout, which is the same value
the driver uses to determine dead executors. This means if there is a network issue,
the executor is determined dead after one heartbeat attempt. There is a separate config
for the heartbeat interval which is a better value to use for the heartbeat RPC. With
this change, the executor will make multiple heartbeat attempts even with RPC issues.
  • Loading branch information
nongli committed Dec 17, 2015
1 parent 8184568 commit c899481
Showing 1 changed file with 3 additions and 1 deletion.
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
Expand Down Expand Up @@ -445,7 +446,8 @@ private[spark] class Executor(

val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
try {
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](message)
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
if (response.reregisterBlockManager) {
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
Expand Down

0 comments on commit c899481

Please sign in to comment.