Skip to content

Commit

Permalink
[SPARK-5931][CORE] Use consistent naming for time properties
Browse files Browse the repository at this point in the history
I've added new utility methods to do the conversion from times specified as e.g. 120s, 240ms, 360us to convert to a consistent internal representation. I've updated usage of these constants throughout the code to be consistent.

I believe I've captured all usages of time-based properties throughout the code. I've also updated variable names in a number of places to reflect their units for clarity and updated documentation where appropriate.

Author: Ilya Ganelin <ilya.ganelin@capitalone.com>
Author: Ilya Ganelin <ilganeli@gmail.com>

Closes apache#5236 from ilganeli/SPARK-5931 and squashes the following commits:

4526c81 [Ilya Ganelin] Update configuration.md
de3bff9 [Ilya Ganelin] Fixing style errors
f5fafcd [Ilya Ganelin] Doc updates
951ca2d [Ilya Ganelin] Made the most recent round of changes
bc04e05 [Ilya Ganelin] Minor fixes and doc updates
25d3f52 [Ilya Ganelin] Minor nit fixes
642a06d [Ilya Ganelin] Fixed logic for invalid suffixes and addid matching test
8927e66 [Ilya Ganelin] Fixed handling of -1
69fedcc [Ilya Ganelin] Added test for zero
dc7bd08 [Ilya Ganelin] Fixed error in exception handling
7d19cdd [Ilya Ganelin] Added fix for possible NPE
6f651a8 [Ilya Ganelin] Now using regexes to simplify code in parseTimeString. Introduces getTimeAsSec and getTimeAsMs methods in SparkConf. Updated documentation
cbd2ca6 [Ilya Ganelin] Formatting error
1a1122c [Ilya Ganelin] Formatting fixes and added m for use as minute formatter
4e48679 [Ilya Ganelin] Fixed priority order and mixed up conversions in a couple spots
d4efd26 [Ilya Ganelin] Added time conversion for yarn.scheduler.heartbeat.interval-ms
cbf41db [Ilya Ganelin] Got rid of thrown exceptions
1465390 [Ilya Ganelin] Nit
28187bf [Ilya Ganelin] Convert straight to seconds
ff40bfe [Ilya Ganelin] Updated tests to fix small bugs
19c31af [Ilya Ganelin] Added cleaner computation of time conversions in tests
6387772 [Ilya Ganelin] Updated suffix handling to handle overlap of units more gracefully
5193d5f [Ilya Ganelin] Resolved merge conflicts
76cfa27 [Ilya Ganelin] [SPARK-5931] Minor nit fixes'
bf779b0 [Ilya Ganelin] Special handling of overlapping usffixes for java
dd0a680 [Ilya Ganelin] Updated scala code to call into java
b2fc965 [Ilya Ganelin] replaced get or default since it's not present in this version of java
39164f9 [Ilya Ganelin] [SPARK-5931] Updated Java conversion to be similar to scala conversion. Updated conversions to clean up code a little using TimeUnit.convert. Added Unit tests
3b126e1 [Ilya Ganelin] Fixed conversion to US from seconds
1858197 [Ilya Ganelin] Fixed bug where all time was being converted to us instead of the appropriate units
bac9edf [Ilya Ganelin] More whitespace
8613631 [Ilya Ganelin] Whitespace
1c0c07c [Ilya Ganelin] Updated Java code to add day, minutes, and hours
647b5ac [Ilya Ganelin] Udpated time conversion to use map iterator instead of if fall through
70ac213 [Ilya Ganelin] Fixed remaining usages to be consistent. Updated Java-side time conversion
68f4e93 [Ilya Ganelin] Updated more files to clean up usage of default time strings
3a12dd8 [Ilya Ganelin] Updated host revceiver
5232a36 [Ilya Ganelin] [SPARK-5931] Changed default behavior of time string conversion.
499bdf0 [Ilya Ganelin] Merge branch 'SPARK-5931' of github.com:ilganeli/spark into SPARK-5931
9e2547c [Ilya Ganelin] Reverting doc changes
8f741e1 [Ilya Ganelin] Update JavaUtils.java
34f87c2 [Ilya Ganelin] Update Utils.scala
9a29d8d [Ilya Ganelin] Fixed misuse of time in streaming context test
42477aa [Ilya Ganelin] Updated configuration doc with note on specifying time properties
cde9bff [Ilya Ganelin] Updated spark.streaming.blockInterval
c6a0095 [Ilya Ganelin] Updated spark.core.connection.auth.wait.timeout
5181597 [Ilya Ganelin] Updated spark.dynamicAllocation.schedulerBacklogTimeout
2fcc91c [Ilya Ganelin] Updated spark.dynamicAllocation.executorIdleTimeout
6d1518e [Ilya Ganelin] Upated spark.speculation.interval
3f1cfc8 [Ilya Ganelin] Updated spark.scheduler.revive.interval
3352d34 [Ilya Ganelin] Updated spark.scheduler.maxRegisteredResourcesWaitingTime
272c215 [Ilya Ganelin] Updated spark.locality.wait
7320c87 [Ilya Ganelin] updated spark.akka.heartbeat.interval
064ebd6 [Ilya Ganelin] Updated usage of spark.cleaner.ttl
21ef3dd [Ilya Ganelin] updated spark.shuffle.sasl.timeout
c9f5cad [Ilya Ganelin] Updated spark.shuffle.io.retryWait
4933fda [Ilya Ganelin] Updated usage of spark.storage.blockManagerSlaveTimeout
7db6d2a [Ilya Ganelin] Updated usage of spark.akka.timeout
404f8c3 [Ilya Ganelin] Updated usage of spark.core.connection.ack.wait.timeout
59bf9e1 [Ilya Ganelin] [SPARK-5931] Updated Utils and JavaUtils classes to add helper methods to handle time strings. Updated time strings in a few places to properly parse time
  • Loading branch information
Ilya Ganelin authored and Andrew Or committed Apr 13, 2015
1 parent c5602bd commit c4ab255
Show file tree
Hide file tree
Showing 25 changed files with 345 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ private[spark] class ExecutorAllocationManager(
Integer.MAX_VALUE)

// How long there must be backlogged tasks for before an addition is triggered (seconds)
private val schedulerBacklogTimeout = conf.getLong(
"spark.dynamicAllocation.schedulerBacklogTimeout", 5)
private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.schedulerBacklogTimeout", "5s")

// Same as above, but used only after `schedulerBacklogTimeout` is exceeded
private val sustainedSchedulerBacklogTimeout = conf.getLong(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
// Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded
private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", s"${schedulerBacklogTimeoutS}s")

// How long an executor must be idle for before it is removed (seconds)
private val executorIdleTimeout = conf.getLong(
"spark.dynamicAllocation.executorIdleTimeout", 600)
private val executorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.executorIdleTimeout", "600s")

// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
Expand Down Expand Up @@ -150,14 +150,14 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
}
if (schedulerBacklogTimeout <= 0) {
if (schedulerBacklogTimeoutS <= 0) {
throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
}
if (sustainedSchedulerBacklogTimeout <= 0) {
if (sustainedSchedulerBacklogTimeoutS <= 0) {
throw new SparkException(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
}
if (executorIdleTimeout <= 0) {
if (executorIdleTimeoutS <= 0) {
throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
}
// Require external shuffle service for dynamic allocation
Expand Down Expand Up @@ -262,8 +262,8 @@ private[spark] class ExecutorAllocationManager(
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeout seconds)")
addTime += sustainedSchedulerBacklogTimeout * 1000
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
addTime += sustainedSchedulerBacklogTimeoutS * 1000
delta
} else {
0
Expand Down Expand Up @@ -351,7 +351,7 @@ private[spark] class ExecutorAllocationManager(
val removeRequestAcknowledged = testing || client.killExecutor(executorId)
if (removeRequestAcknowledged) {
logInfo(s"Removing executor $executorId because it has been idle for " +
s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})")
executorsPendingToRemove.add(executorId)
true
} else {
Expand Down Expand Up @@ -407,8 +407,8 @@ private[spark] class ExecutorAllocationManager(
private def onSchedulerBacklogged(): Unit = synchronized {
if (addTime == NOT_SET) {
logDebug(s"Starting timer to add executors because pending tasks " +
s"are building up (to expire in $schedulerBacklogTimeout seconds)")
addTime = clock.getTimeMillis + schedulerBacklogTimeout * 1000
s"are building up (to expire in $schedulerBacklogTimeoutS seconds)")
addTime = clock.getTimeMillis + schedulerBacklogTimeoutS * 1000
}
}

Expand All @@ -431,8 +431,8 @@ private[spark] class ExecutorAllocationManager(
if (executorIds.contains(executorId)) {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
s"scheduled to run on the executor (to expire in $executorIdleTimeoutS seconds)")
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeoutS * 1000
}
} else {
logWarning(s"Attempted to mark unknown executor $executorId idle")
Expand Down
15 changes: 9 additions & 6 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,17 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)

// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
// "milliseconds"
private val executorTimeoutMs = sc.conf.getOption("spark.network.timeout").map(_.toLong * 1000).
getOrElse(sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120000))

private val slaveTimeoutMs =
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", "120s")
private val executorTimeoutMs =
sc.conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms") * 1000

// "spark.network.timeoutInterval" uses "seconds", while
// "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
private val checkTimeoutIntervalMs =
sc.conf.getOption("spark.network.timeoutInterval").map(_.toLong * 1000).
getOrElse(sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000))
private val timeoutIntervalMs =
sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s")
private val checkTimeoutIntervalMs =
sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000

private var timeoutCheckingTask: ScheduledFuture[_] = null

Expand Down
36 changes: 36 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,42 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
getOption(key).getOrElse(defaultValue)
}

/**
* Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
* suffix is provided then seconds are assumed.
* @throws NoSuchElementException
*/
def getTimeAsSeconds(key: String): Long = {
Utils.timeStringAsSeconds(get(key))
}

/**
* Get a time parameter as seconds, falling back to a default if not set. If no
* suffix is provided then seconds are assumed.
*
*/
def getTimeAsSeconds(key: String, defaultValue: String): Long = {
Utils.timeStringAsSeconds(get(key, defaultValue))
}

/**
* Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
* suffix is provided then milliseconds are assumed.
* @throws NoSuchElementException
*/
def getTimeAsMs(key: String): Long = {
Utils.timeStringAsMs(get(key))
}

/**
* Get a time parameter as milliseconds, falling back to a default if not set. If no
* suffix is provided then milliseconds are assumed.
*/
def getTimeAsMs(key: String, defaultValue: String): Long = {
Utils.timeStringAsMs(get(key, defaultValue))
}


/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
Option(settings.get(key))
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -436,14 +436,14 @@ private[spark] class Executor(
* This thread stops running when the executor is stopped.
*/
private def startDriverHeartbeater(): Unit = {
val interval = conf.getInt("spark.executor.heartbeatInterval", 10000)
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
val thread = new Thread() {
override def run() {
// Sleep a random interval so the heartbeats don't end up in sync
Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])
Thread.sleep(intervalMs + (math.random * intervalMs).asInstanceOf[Int])
while (!isStopped) {
reportHeartBeat()
Thread.sleep(interval)
Thread.sleep(intervalMs)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ private[nio] class ConnectionManager(
new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor"))

private val ackTimeout =
conf.getInt("spark.core.connection.ack.wait.timeout", conf.getInt("spark.network.timeout", 120))
conf.getTimeAsSeconds("spark.core.connection.ack.wait.timeout",
conf.get("spark.network.timeout", "120s"))

// Get the thread counts from the Spark Configuration.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ private[spark] class TaskSchedulerImpl(
val conf = sc.conf

// How often to check for speculative tasks
val SPECULATION_INTERVAL = conf.getLong("spark.speculation.interval", 100)
val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")

// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000)
val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")

// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
Expand Down Expand Up @@ -143,8 +143,8 @@ private[spark] class TaskSchedulerImpl(
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds,
SPECULATION_INTERVAL_MS milliseconds) {
Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
}
}
Expand Down Expand Up @@ -173,7 +173,7 @@ private[spark] class TaskSchedulerImpl(
this.cancel()
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,15 +848,18 @@ private[spark] class TaskSetManager(
}

private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
val defaultWait = conf.get("spark.locality.wait", "3000")
level match {
case TaskLocality.PROCESS_LOCAL =>
conf.get("spark.locality.wait.process", defaultWait).toLong
case TaskLocality.NODE_LOCAL =>
conf.get("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
conf.get("spark.locality.wait.rack", defaultWait).toLong
case _ => 0L
val defaultWait = conf.get("spark.locality.wait", "3s")
val localityWaitKey = level match {
case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
case _ => null
}

if (localityWaitKey != null) {
conf.getTimeAsMs(localityWaitKey, defaultWait)
} else {
0L
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
// Submit tasks after maxRegisteredWaitingTime milliseconds
// if minRegisteredRatio has not yet been reached
val maxRegisteredWaitingTime =
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
val maxRegisteredWaitingTimeMs =
conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
val createTime = System.currentTimeMillis()

private val executorDataMap = new HashMap[String, ExecutorData]
Expand All @@ -77,12 +77,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

override def onStart() {
// Periodically revive offers to allow delay scheduling to work
val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000)
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")

reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}
}, 0, reviveInterval, TimeUnit.MILLISECONDS)
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
}

override def receive: PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -301,9 +302,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
return true
}
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTimeMs) {
logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTime(ms)")
s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeMs(ms)")
return true
}
false
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.util
import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.Await
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.Try

import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import akka.pattern.ask
Expand Down Expand Up @@ -66,7 +65,8 @@ private[spark] object AkkaUtils extends Logging {

val akkaThreads = conf.getInt("spark.akka.threads", 4)
val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
val akkaTimeout = conf.getInt("spark.akka.timeout", conf.getInt("spark.network.timeout", 120))
val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout",
conf.get("spark.network.timeout", "120s"))
val akkaFrameSize = maxFrameSizeBytes(conf)
val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
Expand All @@ -78,8 +78,8 @@ private[spark] object AkkaUtils extends Logging {

val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"

val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 6000)
val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)
val akkaHeartBeatPausesS = conf.getTimeAsSeconds("spark.akka.heartbeat.pauses", "6000s")
val akkaHeartBeatIntervalS = conf.getTimeAsSeconds("spark.akka.heartbeat.interval", "1000s")

val secretKey = securityManager.getSecretKey()
val isAuthOn = securityManager.isAuthenticationEnabled()
Expand All @@ -102,14 +102,14 @@ private[spark] object AkkaUtils extends Logging {
|akka.jvm-exit-on-fatal-error = off
|akka.remote.require-cookie = "$requireCookie"
|akka.remote.secure-cookie = "$secureCookie"
|akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
|akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
|akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatIntervalS s
|akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPausesS s
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = $port
|akka.remote.netty.tcp.tcp-nodelay = on
|akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
|akka.remote.netty.tcp.connection-timeout = $akkaTimeoutS s
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
|akka.remote.netty.tcp.execution-pool-size = $akkaThreads
|akka.actor.default-dispatcher.throughput = $akkaBatchSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[spark] object MetadataCleanerType extends Enumeration {
// initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
private[spark] object MetadataCleaner {
def getDelaySeconds(conf: SparkConf): Int = {
conf.getInt("spark.cleaner.ttl", -1)
conf.getTimeAsSeconds("spark.cleaner.ttl", "-1").toInt
}

def getDelaySeconds(
Expand Down
26 changes: 22 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
import java.util.{Properties, Locale, Random, UUID}
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection

import scala.collection.JavaConversions._
Expand All @@ -47,6 +47,7 @@ import tachyon.client.{TachyonFS, TachyonFile}

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}

/** CallSite represents a place in user code. It can have a short and a long form. */
Expand Down Expand Up @@ -612,9 +613,10 @@ private[spark] object Utils extends Logging {
}
Utils.setupSecureURLConnection(uc, securityMgr)

val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000
uc.setConnectTimeout(timeout)
uc.setReadTimeout(timeout)
val timeoutMs =
conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
uc.setConnectTimeout(timeoutMs)
uc.setReadTimeout(timeoutMs)
uc.connect()
val in = uc.getInputStream()
downloadFile(url, in, targetFile, fileOverwrite)
Expand Down Expand Up @@ -1018,6 +1020,22 @@ private[spark] object Utils extends Logging {
)
}

/**
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in ms.
*/
def timeStringAsMs(str: String): Long = {
JavaUtils.timeStringAsMs(str)
}

/**
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in seconds.
*/
def timeStringAsSeconds(str: String): Long = {
JavaUtils.timeStringAsSec(str)
}

/**
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
*/
Expand Down
Loading

0 comments on commit c4ab255

Please sign in to comment.