From 343bf8b4815332925116ace7217d90a641dfa96f Mon Sep 17 00:00:00 2001 From: Devesh Agrawal Date: Sat, 15 Aug 2020 12:04:35 -0700 Subject: [PATCH] Switch to using CacheBuilder instead of home grown expiration --- .../spark/scheduler/TaskSchedulerImpl.scala | 40 ++++++++++--------- .../scheduler/TaskSchedulerImplSuite.scala | 10 ++--- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 3bdd1b5baa53a..6501a75508db8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -18,7 +18,6 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer -import java.util import java.util.{Timer, TimerTask} import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong @@ -27,6 +26,9 @@ import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, HashSet} import scala.util.Random +import com.google.common.base.Ticker +import com.google.common.cache.CacheBuilder + import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.ExecutorMetrics @@ -137,9 +139,21 @@ private[spark] class TaskSchedulerImpl( // IDs of the tasks running on each executor private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] + // We add executors here when we first get decommission notification for them. Executors can + // continue to run even after being asked to decommission, but they will eventually exit. val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo] - // map of second to list of executors to clear form the above map - val decommissioningExecutorsToGc = new util.TreeMap[Long, mutable.ArrayBuffer[String]]() + + // When they exit and we know of that via heartbeat failure, we will add them to this cache. + // This cache is consulted to know if a fetch failure is because a source executor was + // decommissioned. + lazy val decommissionedExecutorsRemoved = CacheBuilder.newBuilder() + .expireAfterWrite( + conf.getLong("spark.decommissioningRememberAfterRemoval.seconds", 60L), TimeUnit.SECONDS) + .ticker(new Ticker{ + override def read(): Long = TimeUnit.MILLISECONDS.toNanos(clock.getTimeMillis()) + }) + .build[String, ExecutorDecommissionInfo]() + .asMap() def runningTasksByExecutors: Map[String, Int] = synchronized { executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap @@ -924,13 +938,9 @@ private[spark] class TaskSchedulerImpl( override def getExecutorDecommissionInfo(executorId: String) : Option[ExecutorDecommissionInfo] = synchronized { - import scala.collection.JavaConverters._ - // Garbage collect old decommissioning entries - val secondsToGcUptil = TimeUnit.MILLISECONDS.toSeconds(clock.getTimeMillis()) - val headMap = decommissioningExecutorsToGc.headMap(secondsToGcUptil) - headMap.values().asScala.flatten.foreach(executorsPendingDecommission -= _) - headMap.clear() - executorsPendingDecommission.get(executorId) + executorsPendingDecommission + .get(executorId) + .orElse(Option(decommissionedExecutorsRemoved.get(executorId))) } override def executorLost(executorId: String, givenReason: ExecutorLossReason): Unit = { @@ -1037,14 +1047,8 @@ private[spark] class TaskSchedulerImpl( } - val decomInfo = executorsPendingDecommission.get(executorId) - if (decomInfo.isDefined) { - val rememberSeconds = - conf.getInt("spark.decommissioningRememberAfterRemoval.seconds", 60) - val gcSecond = TimeUnit.MILLISECONDS.toSeconds(clock.getTimeMillis()) + rememberSeconds - decommissioningExecutorsToGc.computeIfAbsent(gcSecond, _ => mutable.ArrayBuffer.empty) += - executorId - } + val decomInfo = executorsPendingDecommission.remove(executorId) + decomInfo.foreach(decommissionedExecutorsRemoved.put(executorId, _)) if (reason != LossReasonPending) { executorIdToHost -= executorId diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 75d890de62038..93b357791e8c3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1855,20 +1855,18 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) clock.advance(2000) scheduler.executorLost("executor1", ExecutorExited(0, false, "normal")) - assert(scheduler.decommissioningExecutorsToGc.size === 1) - assert(scheduler.executorsPendingDecommission.size === 1) + assert(scheduler.decommissionedExecutorsRemoved.size === 1) + assert(scheduler.executorsPendingDecommission.isEmpty) clock.advance(2000) // It hasn't been 60 seconds yet before removal assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false)) clock.advance(2000) - assert(scheduler.decommissioningExecutorsToGc.size === 1) - assert(scheduler.executorsPendingDecommission.size === 1) + assert(scheduler.decommissionedExecutorsRemoved.size === 1) assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) clock.advance(61000) assert(scheduler.getExecutorDecommissionInfo("executor1").isEmpty) - assert(scheduler.decommissioningExecutorsToGc.isEmpty) - assert(scheduler.executorsPendingDecommission.isEmpty) + assert(scheduler.decommissionedExecutorsRemoved.isEmpty) } /**