diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 9385f557c4614..4e7bf51fc0622 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -80,16 +80,16 @@ private[spark] class ExecutorAllocationManager( Integer.MAX_VALUE) // How long there must be backlogged tasks for before an addition is triggered (seconds) - private val schedulerBacklogTimeout = conf.getLong( - "spark.dynamicAllocation.schedulerBacklogTimeout", 5) + private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds( + "spark.dynamicAllocation.schedulerBacklogTimeout", "5s") - // Same as above, but used only after `schedulerBacklogTimeout` is exceeded - private val sustainedSchedulerBacklogTimeout = conf.getLong( - "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout) + // Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded + private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds( + "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", s"${schedulerBacklogTimeoutS}s") // How long an executor must be idle for before it is removed (seconds) - private val executorIdleTimeout = conf.getLong( - "spark.dynamicAllocation.executorIdleTimeout", 600) + private val executorIdleTimeoutS = conf.getTimeAsSeconds( + "spark.dynamicAllocation.executorIdleTimeout", "600s") // During testing, the methods to actually kill and add executors are mocked out private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) @@ -150,14 +150,14 @@ private[spark] class ExecutorAllocationManager( throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " + s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!") } - if (schedulerBacklogTimeout <= 0) { + if (schedulerBacklogTimeoutS <= 0) { throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!") } - if (sustainedSchedulerBacklogTimeout <= 0) { + if (sustainedSchedulerBacklogTimeoutS <= 0) { throw new SparkException( "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!") } - if (executorIdleTimeout <= 0) { + if (executorIdleTimeoutS <= 0) { throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!") } // Require external shuffle service for dynamic allocation @@ -262,8 +262,8 @@ private[spark] class ExecutorAllocationManager( } else if (addTime != NOT_SET && now >= addTime) { val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " + - s"expire in $sustainedSchedulerBacklogTimeout seconds)") - addTime += sustainedSchedulerBacklogTimeout * 1000 + s"expire in $sustainedSchedulerBacklogTimeoutS seconds)") + addTime += sustainedSchedulerBacklogTimeoutS * 1000 delta } else { 0 @@ -351,7 +351,7 @@ private[spark] class ExecutorAllocationManager( val removeRequestAcknowledged = testing || client.killExecutor(executorId) if (removeRequestAcknowledged) { logInfo(s"Removing executor $executorId because it has been idle for " + - s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})") + s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})") executorsPendingToRemove.add(executorId) true } else { @@ -407,8 +407,8 @@ private[spark] class ExecutorAllocationManager( private def onSchedulerBacklogged(): Unit = synchronized { if (addTime == NOT_SET) { logDebug(s"Starting timer to add executors because pending tasks " + - s"are building up (to expire in $schedulerBacklogTimeout seconds)") - addTime = clock.getTimeMillis + schedulerBacklogTimeout * 1000 + s"are building up (to expire in $schedulerBacklogTimeoutS seconds)") + addTime = clock.getTimeMillis + schedulerBacklogTimeoutS * 1000 } } @@ -431,8 +431,8 @@ private[spark] class ExecutorAllocationManager( if (executorIds.contains(executorId)) { if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) { logDebug(s"Starting idle timer for $executorId because there are no more tasks " + - s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)") - removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000 + s"scheduled to run on the executor (to expire in $executorIdleTimeoutS seconds)") + removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeoutS * 1000 } } else { logWarning(s"Attempted to mark unknown executor $executorId idle") diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 5871b8c869f03..e3bd16f1cbf24 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -62,14 +62,17 @@ private[spark] class HeartbeatReceiver(sc: SparkContext) // "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses // "milliseconds" - private val executorTimeoutMs = sc.conf.getOption("spark.network.timeout").map(_.toLong * 1000). - getOrElse(sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120000)) - + private val slaveTimeoutMs = + sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", "120s") + private val executorTimeoutMs = + sc.conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms") * 1000 + // "spark.network.timeoutInterval" uses "seconds", while // "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds" - private val checkTimeoutIntervalMs = - sc.conf.getOption("spark.network.timeoutInterval").map(_.toLong * 1000). - getOrElse(sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)) + private val timeoutIntervalMs = + sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s") + private val checkTimeoutIntervalMs = + sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000 private var timeoutCheckingTask: ScheduledFuture[_] = null diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 0c123c96b8d7b..390e631647bd6 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -174,6 +174,42 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { getOption(key).getOrElse(defaultValue) } + /** + * Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no + * suffix is provided then seconds are assumed. + * @throws NoSuchElementException + */ + def getTimeAsSeconds(key: String): Long = { + Utils.timeStringAsSeconds(get(key)) + } + + /** + * Get a time parameter as seconds, falling back to a default if not set. If no + * suffix is provided then seconds are assumed. + * + */ + def getTimeAsSeconds(key: String, defaultValue: String): Long = { + Utils.timeStringAsSeconds(get(key, defaultValue)) + } + + /** + * Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no + * suffix is provided then milliseconds are assumed. + * @throws NoSuchElementException + */ + def getTimeAsMs(key: String): Long = { + Utils.timeStringAsMs(get(key)) + } + + /** + * Get a time parameter as milliseconds, falling back to a default if not set. If no + * suffix is provided then milliseconds are assumed. + */ + def getTimeAsMs(key: String, defaultValue: String): Long = { + Utils.timeStringAsMs(get(key, defaultValue)) + } + + /** Get a parameter as an Option */ def getOption(key: String): Option[String] = { Option(settings.get(key)) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 14f99a464b6e9..516f619529c48 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -436,14 +436,14 @@ private[spark] class Executor( * This thread stops running when the executor is stopped. */ private def startDriverHeartbeater(): Unit = { - val interval = conf.getInt("spark.executor.heartbeatInterval", 10000) + val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") val thread = new Thread() { override def run() { // Sleep a random interval so the heartbeats don't end up in sync - Thread.sleep(interval + (math.random * interval).asInstanceOf[Int]) + Thread.sleep(intervalMs + (math.random * intervalMs).asInstanceOf[Int]) while (!isStopped) { reportHeartBeat() - Thread.sleep(interval) + Thread.sleep(intervalMs) } } } diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index 741fe3e1ea750..8e3c30fc3d781 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -82,7 +82,8 @@ private[nio] class ConnectionManager( new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor")) private val ackTimeout = - conf.getInt("spark.core.connection.ack.wait.timeout", conf.getInt("spark.network.timeout", 120)) + conf.getTimeAsSeconds("spark.core.connection.ack.wait.timeout", + conf.get("spark.network.timeout", "120s")) // Get the thread counts from the Spark Configuration. // diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 076b36e86c0ce..2362cc7240039 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -62,10 +62,10 @@ private[spark] class TaskSchedulerImpl( val conf = sc.conf // How often to check for speculative tasks - val SPECULATION_INTERVAL = conf.getLong("spark.speculation.interval", 100) + val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms") // Threshold above which we warn user initial TaskSet may be starved - val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000) + val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s") // CPUs to request per task val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) @@ -143,8 +143,8 @@ private[spark] class TaskSchedulerImpl( if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher - sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, - SPECULATION_INTERVAL milliseconds) { + sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds, + SPECULATION_INTERVAL_MS milliseconds) { Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() } } } @@ -173,7 +173,7 @@ private[spark] class TaskSchedulerImpl( this.cancel() } } - }, STARVATION_TIMEOUT, STARVATION_TIMEOUT) + }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index d509881c74fef..7dc325283d961 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -848,15 +848,18 @@ private[spark] class TaskSetManager( } private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { - val defaultWait = conf.get("spark.locality.wait", "3000") - level match { - case TaskLocality.PROCESS_LOCAL => - conf.get("spark.locality.wait.process", defaultWait).toLong - case TaskLocality.NODE_LOCAL => - conf.get("spark.locality.wait.node", defaultWait).toLong - case TaskLocality.RACK_LOCAL => - conf.get("spark.locality.wait.rack", defaultWait).toLong - case _ => 0L + val defaultWait = conf.get("spark.locality.wait", "3s") + val localityWaitKey = level match { + case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process" + case TaskLocality.NODE_LOCAL => "spark.locality.wait.node" + case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack" + case _ => null + } + + if (localityWaitKey != null) { + conf.getTimeAsMs(localityWaitKey, defaultWait) + } else { + 0L } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4c49da87af9dc..63987dfb32695 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -52,8 +52,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)) // Submit tasks after maxRegisteredWaitingTime milliseconds // if minRegisteredRatio has not yet been reached - val maxRegisteredWaitingTime = - conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000) + val maxRegisteredWaitingTimeMs = + conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s") val createTime = System.currentTimeMillis() private val executorDataMap = new HashMap[String, ExecutorData] @@ -77,12 +77,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def onStart() { // Periodically revive offers to allow delay scheduling to work - val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000) + val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s") + reviveThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReviveOffers)) } - }, 0, reviveInterval, TimeUnit.MILLISECONDS) + }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS) } override def receive: PartialFunction[Any, Unit] = { @@ -301,9 +302,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp s"reached minRegisteredResourcesRatio: $minRegisteredRatio") return true } - if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { + if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTimeMs) { logInfo("SchedulerBackend is ready for scheduling beginning after waiting " + - s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTime(ms)") + s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeMs(ms)") return true } false diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 6c2c5261306e7..8e8cc7cc6389e 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -20,7 +20,6 @@ package org.apache.spark.util import scala.collection.JavaConversions.mapAsJavaMap import scala.concurrent.Await import scala.concurrent.duration.{Duration, FiniteDuration} -import scala.util.Try import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} import akka.pattern.ask @@ -66,7 +65,8 @@ private[spark] object AkkaUtils extends Logging { val akkaThreads = conf.getInt("spark.akka.threads", 4) val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) - val akkaTimeout = conf.getInt("spark.akka.timeout", conf.getInt("spark.network.timeout", 120)) + val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout", + conf.get("spark.network.timeout", "120s")) val akkaFrameSize = maxFrameSizeBytes(conf) val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" @@ -78,8 +78,8 @@ private[spark] object AkkaUtils extends Logging { val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off" - val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 6000) - val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000) + val akkaHeartBeatPausesS = conf.getTimeAsSeconds("spark.akka.heartbeat.pauses", "6000s") + val akkaHeartBeatIntervalS = conf.getTimeAsSeconds("spark.akka.heartbeat.interval", "1000s") val secretKey = securityManager.getSecretKey() val isAuthOn = securityManager.isAuthenticationEnabled() @@ -102,14 +102,14 @@ private[spark] object AkkaUtils extends Logging { |akka.jvm-exit-on-fatal-error = off |akka.remote.require-cookie = "$requireCookie" |akka.remote.secure-cookie = "$secureCookie" - |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s - |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s + |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatIntervalS s + |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPausesS s |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = $port |akka.remote.netty.tcp.tcp-nodelay = on - |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s + |akka.remote.netty.tcp.connection-timeout = $akkaTimeoutS s |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B |akka.remote.netty.tcp.execution-pool-size = $akkaThreads |akka.actor.default-dispatcher.throughput = $akkaBatchSize diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index 375ed430bde45..2bbfc988a99a8 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -76,7 +76,7 @@ private[spark] object MetadataCleanerType extends Enumeration { // initialization of StreamingContext. It's okay for users trying to configure stuff themselves. private[spark] object MetadataCleaner { def getDelaySeconds(conf: SparkConf): Int = { - conf.getInt("spark.cleaner.ttl", -1) + conf.getTimeAsSeconds("spark.cleaner.ttl", "-1").toInt } def getDelaySeconds( diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a541d660cd5c6..1029b0f9fce1e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer import java.util.{Properties, Locale, Random, UUID} -import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor} +import java.util.concurrent._ import javax.net.ssl.HttpsURLConnection import scala.collection.JavaConversions._ @@ -47,6 +47,7 @@ import tachyon.client.{TachyonFS, TachyonFile} import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} /** CallSite represents a place in user code. It can have a short and a long form. */ @@ -612,9 +613,10 @@ private[spark] object Utils extends Logging { } Utils.setupSecureURLConnection(uc, securityMgr) - val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000 - uc.setConnectTimeout(timeout) - uc.setReadTimeout(timeout) + val timeoutMs = + conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000 + uc.setConnectTimeout(timeoutMs) + uc.setReadTimeout(timeoutMs) uc.connect() val in = uc.getInputStream() downloadFile(url, in, targetFile, fileOverwrite) @@ -1018,6 +1020,22 @@ private[spark] object Utils extends Logging { ) } + /** + * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If + * no suffix is provided, the passed number is assumed to be in ms. + */ + def timeStringAsMs(str: String): Long = { + JavaUtils.timeStringAsMs(str) + } + + /** + * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If + * no suffix is provided, the passed number is assumed to be in seconds. + */ + def timeStringAsSeconds(str: String): Long = { + JavaUtils.timeStringAsSec(str) + } + /** * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. */ diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 3ded1e4af8742..6b3049b28cd5e 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -684,10 +684,11 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit .set("spark.dynamicAllocation.enabled", "true") .set("spark.dynamicAllocation.minExecutors", minExecutors.toString) .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString) - .set("spark.dynamicAllocation.schedulerBacklogTimeout", schedulerBacklogTimeout.toString) + .set("spark.dynamicAllocation.schedulerBacklogTimeout", + s"${schedulerBacklogTimeout.toString}s") .set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", - sustainedSchedulerBacklogTimeout.toString) - .set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString) + s"${sustainedSchedulerBacklogTimeout.toString}s") + .set("spark.dynamicAllocation.executorIdleTimeout", s"${executorIdleTimeout.toString}s") .set("spark.dynamicAllocation.testing", "true") val sc = new SparkContext(conf) contexts += sc diff --git a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala index 716f875d30b8a..02424c59d6831 100644 --- a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala @@ -260,8 +260,8 @@ class ConnectionManagerSuite extends FunSuite { test("sendMessageReliably timeout") { val clientConf = new SparkConf clientConf.set("spark.authenticate", "false") - val ackTimeout = 30 - clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}") + val ackTimeoutS = 30 + clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeoutS}s") val clientSecurityManager = new SecurityManager(clientConf) val manager = new ConnectionManager(0, clientConf, clientSecurityManager) @@ -272,7 +272,7 @@ class ConnectionManagerSuite extends FunSuite { val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager) managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { // sleep 60 sec > ack timeout for simulating server slow down or hang up - Thread.sleep(ackTimeout * 3 * 1000) + Thread.sleep(ackTimeoutS * 3 * 1000) None }) @@ -287,7 +287,7 @@ class ConnectionManagerSuite extends FunSuite { // Otherwise TimeoutExcepton is thrown from Await.result. // We expect TimeoutException is not thrown. intercept[IOException] { - Await.result(future, (ackTimeout * 2) second) + Await.result(future, (ackTimeoutS * 2) second) } manager.stop() diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 716d12c0762cf..6198cea46ddf8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.scheduler -import java.io.{ObjectInputStream, ObjectOutputStream, IOException} import java.util.Random import scala.collection.mutable.ArrayBuffer @@ -27,7 +26,7 @@ import org.scalatest.FunSuite import org.apache.spark._ import org.apache.spark.executor.TaskMetrics -import org.apache.spark.util.ManualClock +import org.apache.spark.util.{ManualClock, Utils} class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) extends DAGScheduler(sc) { @@ -152,7 +151,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { private val conf = new SparkConf - val LOCALITY_WAIT = conf.getLong("spark.locality.wait", 3000) + val LOCALITY_WAIT_MS = conf.getTimeAsMs("spark.locality.wait", "3s") val MAX_TASK_FAILURES = 4 override def beforeEach() { @@ -240,7 +239,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None) - clock.advance(LOCALITY_WAIT) + clock.advance(LOCALITY_WAIT_MS) // Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 2) should // get chosen before the noPref task assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2) @@ -251,7 +250,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { // Offer host2, exec3 again, at NODE_LOCAL level: we should get noPref task // after failing to find a node_Local task assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None) - clock.advance(LOCALITY_WAIT) + clock.advance(LOCALITY_WAIT_MS) assert(manager.resourceOffer("exec2", "host2", NO_PREF).get.index == 3) } @@ -292,7 +291,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { // Offer host1 again: nothing should get chosen assert(manager.resourceOffer("exec1", "host1", ANY) === None) - clock.advance(LOCALITY_WAIT) + clock.advance(LOCALITY_WAIT_MS) // Offer host1 again: second task (on host2) should get chosen assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1) @@ -306,7 +305,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { // Now that we've launched a local task, we should no longer launch the task for host3 assert(manager.resourceOffer("exec2", "host2", ANY) === None) - clock.advance(LOCALITY_WAIT) + clock.advance(LOCALITY_WAIT_MS) // After another delay, we can go ahead and launch that task non-locally assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 3) @@ -338,7 +337,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { // nothing should be chosen assert(manager.resourceOffer("exec1", "host1", ANY) === None) - clock.advance(LOCALITY_WAIT * 2) + clock.advance(LOCALITY_WAIT_MS * 2) // task 1 and 2 would be scheduled as nonLocal task assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1) @@ -528,7 +527,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY))) // Set allowed locality to ANY - clock.advance(LOCALITY_WAIT * 3) + clock.advance(LOCALITY_WAIT_MS * 3) // Offer host3 // No task is scheduled if we restrict locality to RACK_LOCAL assert(manager.resourceOffer("execC", "host3", RACK_LOCAL) === None) @@ -622,12 +621,12 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1) manager.speculatableTasks += 1 - clock.advance(LOCALITY_WAIT) + clock.advance(LOCALITY_WAIT_MS) // schedule the nonPref task assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2) // schedule the speculative task assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1) - clock.advance(LOCALITY_WAIT * 3) + clock.advance(LOCALITY_WAIT_MS * 3) // schedule non-local tasks assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3) } @@ -716,13 +715,13 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) assert(manager.resourceOffer("execA", "host1", ANY) !== None) - clock.advance(LOCALITY_WAIT * 4) + clock.advance(LOCALITY_WAIT_MS * 4) assert(manager.resourceOffer("execB.2", "host2", ANY) !== None) sched.removeExecutor("execA") sched.removeExecutor("execB.2") manager.executorLost("execA", "host1") manager.executorLost("execB.2", "host2") - clock.advance(LOCALITY_WAIT * 4) + clock.advance(LOCALITY_WAIT_MS * 4) sched.addExecutor("execC", "host3") manager.executorAdded() // Prior to the fix, this line resulted in an ArrayIndexOutOfBoundsException: diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index b4de90b65d545..ffa5162a31841 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -76,7 +76,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd conf.set("spark.storage.unrollMemoryThreshold", "512") // to make a replication attempt to inactive store fail fast - conf.set("spark.core.connection.ack.wait.timeout", "1") + conf.set("spark.core.connection.ack.wait.timeout", "1s") // to make cached peers refresh frequently conf.set("spark.storage.cachedPeersTtl", "10") diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 449fb87f111c4..fb97e650ff95c 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -23,6 +23,7 @@ import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStr import java.net.{BindException, ServerSocket, URI} import java.nio.{ByteBuffer, ByteOrder} import java.text.DecimalFormatSymbols +import java.util.concurrent.TimeUnit import java.util.Locale import com.google.common.base.Charsets.UTF_8 @@ -35,7 +36,50 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf class UtilsSuite extends FunSuite with ResetSystemProperties { + + test("timeConversion") { + // Test -1 + assert(Utils.timeStringAsSeconds("-1") === -1) + + // Test zero + assert(Utils.timeStringAsSeconds("0") === 0) + + assert(Utils.timeStringAsSeconds("1") === 1) + assert(Utils.timeStringAsSeconds("1s") === 1) + assert(Utils.timeStringAsSeconds("1000ms") === 1) + assert(Utils.timeStringAsSeconds("1000000us") === 1) + assert(Utils.timeStringAsSeconds("1m") === TimeUnit.MINUTES.toSeconds(1)) + assert(Utils.timeStringAsSeconds("1min") === TimeUnit.MINUTES.toSeconds(1)) + assert(Utils.timeStringAsSeconds("1h") === TimeUnit.HOURS.toSeconds(1)) + assert(Utils.timeStringAsSeconds("1d") === TimeUnit.DAYS.toSeconds(1)) + + assert(Utils.timeStringAsMs("1") === 1) + assert(Utils.timeStringAsMs("1ms") === 1) + assert(Utils.timeStringAsMs("1000us") === 1) + assert(Utils.timeStringAsMs("1s") === TimeUnit.SECONDS.toMillis(1)) + assert(Utils.timeStringAsMs("1m") === TimeUnit.MINUTES.toMillis(1)) + assert(Utils.timeStringAsMs("1min") === TimeUnit.MINUTES.toMillis(1)) + assert(Utils.timeStringAsMs("1h") === TimeUnit.HOURS.toMillis(1)) + assert(Utils.timeStringAsMs("1d") === TimeUnit.DAYS.toMillis(1)) + + // Test invalid strings + intercept[NumberFormatException] { + Utils.timeStringAsMs("This breaks 600s") + } + + intercept[NumberFormatException] { + Utils.timeStringAsMs("This breaks 600ds") + } + intercept[NumberFormatException] { + Utils.timeStringAsMs("600s This breaks") + } + + intercept[NumberFormatException] { + Utils.timeStringAsMs("This 123s breaks") + } + } + test("bytesToString") { assert(Utils.bytesToString(10) === "10.0 B") assert(Utils.bytesToString(1500) === "1500.0 B") diff --git a/docs/configuration.md b/docs/configuration.md index 7fe11475212b3..7169ec295ef7f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -35,9 +35,19 @@ val conf = new SparkConf() val sc = new SparkContext(conf) {% endhighlight %} -Note that we can have more than 1 thread in local mode, and in cases like spark streaming, we may actually -require one to prevent any sort of starvation issues. +Note that we can have more than 1 thread in local mode, and in cases like Spark Streaming, we may +actually require one to prevent any sort of starvation issues. +Properties that specify some time duration should be configured with a unit of time. +The following format is accepted: + + 25ms (milliseconds) + 5s (seconds) + 10m or 10min (minutes) + 3h (hours) + 5d (days) + 1y (years) + ## Dynamically Loading Spark Properties In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For instance, if you'd like to run the same application with different masters or different @@ -429,10 +439,10 @@ Apart from these, the following properties are also available, and may be useful
spark.shuffle.io.retryWait
maxRetries * retryWait
, by default 15 seconds.
+ (Netty only) How long to wait between retries of fetches. The maximum delay caused by retrying
+ is 15 seconds by default, calculated as maxRetries * retryWait
.
spark.executor.heartbeatInterval
spark.files.fetchTimeout
spark.akka.heartbeat.interval
spark.akka.heartbeat.pauses
spark.akka.timeout
spark.network.timeout
spark.core.connection.ack.wait.timeout
, spark.akka.timeout
,
+ Default timeout for all network interactions. This config will be used in place of
+ spark.core.connection.ack.wait.timeout
, spark.akka.timeout
,
spark.storage.blockManagerSlaveTimeoutMs
or
spark.shuffle.io.connectionTimeout
, if they are not configured.
spark.locality.wait
spark.locality.wait.node
, etc.
@@ -1024,10 +1034,9 @@ Apart from these, the following properties are also available, and may be useful
spark.scheduler.maxRegisteredResourcesWaitingTime
spark.scheduler.revive.interval
spark.speculation.interval
spark.dynamicAllocation.executorIdleTimeout
spark.dynamicAllocation.schedulerBacklogTimeout
spark.core.connection.ack.wait.timeout
spark.core.connection.auth.wait.timeout
spark.streaming.blockInterval
spark.yarn.am.waitTime