Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into SPARK-3530
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Nov 6, 2014
2 parents 1ef26e0 + 3d2b5bc commit f46e927
Show file tree
Hide file tree
Showing 58 changed files with 2,099 additions and 357 deletions.
21 changes: 20 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ SUCH DAMAGE.


========================================================================
For Timsort (core/src/main/java/org/apache/spark/util/collection/Sorter.java):
For Timsort (core/src/main/java/org/apache/spark/util/collection/TimSort.java):
========================================================================
Copyright (C) 2008 The Android Open Source Project

Expand All @@ -771,6 +771,25 @@ See the License for the specific language governing permissions and
limitations under the License.


========================================================================
For LimitedInputStream
(network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java):
========================================================================
Copyright (C) 2007 The Guava Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


========================================================================
BSD-style licenses
========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// Lower and upper bounds on the number of executors. These are required.
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
verifyBounds()

// How long there must be backlogged tasks for before an addition is triggered
private val schedulerBacklogTimeout = conf.getLong(
Expand All @@ -77,9 +76,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)

// How long an executor must be idle for before it is removed
private val removeThresholdSeconds = conf.getLong(
private val executorIdleTimeout = conf.getLong(
"spark.dynamicAllocation.executorIdleTimeout", 600)

// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)

validateSettings()

// Number of executors to add in the next round
private var numExecutorsToAdd = 1

Expand All @@ -103,17 +107,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// Polling loop interval (ms)
private val intervalMillis: Long = 100

// Whether we are testing this class. This should only be used internally.
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)

// Clock used to schedule when executors should be added and removed
private var clock: Clock = new RealClock

/**
* Verify that the lower and upper bounds on the number of executors are valid.
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
*/
private def verifyBounds(): Unit = {
private def validateSettings(): Unit = {
if (minNumExecutors < 0 || maxNumExecutors < 0) {
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
}
Expand All @@ -124,6 +125,22 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
}
if (schedulerBacklogTimeout <= 0) {
throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
}
if (sustainedSchedulerBacklogTimeout <= 0) {
throw new SparkException(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
}
if (executorIdleTimeout <= 0) {
throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
}
// Require external shuffle service for dynamic allocation
// Otherwise, we may lose shuffle files when killing executors
if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
}

/**
Expand Down Expand Up @@ -254,7 +271,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
if (removeRequestAcknowledged) {
logInfo(s"Removing executor $executorId because it has been idle for " +
s"$removeThresholdSeconds seconds (new desired total will be ${numExistingExecutors - 1})")
s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
executorsPendingToRemove.add(executorId)
true
} else {
Expand Down Expand Up @@ -329,8 +346,8 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
private def onExecutorIdle(executorId: String): Unit = synchronized {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
s"scheduled to run on the executor (to expire in $removeThresholdSeconds seconds)")
removeTimes(executorId) = clock.getTimeMillis + removeThresholdSeconds * 1000
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ object SparkEnv extends Logging {

// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private[spark] class Executor(
}

override def run() {
val startTime = System.currentTimeMillis()
val deserializeStartTime = System.currentTimeMillis()
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = SparkEnv.get.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
Expand Down Expand Up @@ -206,7 +206,7 @@ private[spark] class Executor(
val afterSerialization = System.currentTimeMillis()

for (m <- task.metrics) {
m.executorDeserializeTime = taskStart - startTime
m.executorDeserializeTime = taskStart - deserializeStartTime
m.executorRunTime = taskFinish - taskStart
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = afterSerialization - beforeSerialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private[spark] class FixedLengthBinaryRecordReader
if (currentPosition < splitEnd) {
// setup a buffer to store the record
val buffer = recordValue.getBytes
fileInputStream.read(buffer, 0, recordLength)
fileInputStream.readFully(buffer)
// update our current position
currentPosition = currentPosition + recordLength
// return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private[spark] class CoarseMesosSchedulerBackend(
setDaemon(true)
override def run() {
val scheduler = CoarseMesosSchedulerBackend.this
val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(sc.appName).build()
val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
try { {
val ret = driver.run()
Expand Down Expand Up @@ -242,8 +242,7 @@ private[spark] class CoarseMesosSchedulerBackend(
for (r <- res if r.getName == name) {
return r.getScalar.getValue
}
// If we reached here, no resource with the required name was present
throw new IllegalArgumentException("No resource called " + name + " in " + res)
0
}

/** Build a Mesos resource protobuf object */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[spark] class MesosSchedulerBackend(
setDaemon(true)
override def run() {
val scheduler = MesosSchedulerBackend.this
val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(sc.appName).build()
val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
try {
val ret = driver.run()
Expand Down Expand Up @@ -278,8 +278,7 @@ private[spark] class MesosSchedulerBackend(
for (r <- res if r.getName == name) {
return r.getScalar.getValue
}
// If we reached here, no resource with the required name was present
throw new IllegalArgumentException("No resource called " + name + " in " + res)
0
}

/** Turn a Spark TaskDescription into a Mesos task */
Expand Down
20 changes: 13 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import org.apache.spark.network.util.{ConfigProvider, TransportConf}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.util._

private[spark] sealed trait BlockValues
Expand Down Expand Up @@ -72,7 +71,8 @@ private[spark] class BlockManager(
val conf: SparkConf,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService)
blockTransferService: BlockTransferService,
securityManager: SecurityManager)
extends BlockDataManager with Logging {

val diskBlockManager = new DiskBlockManager(this, conf)
Expand All @@ -96,7 +96,12 @@ private[spark] class BlockManager(

private[spark]
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
private val externalShuffleServicePort = conf.getInt("spark.shuffle.service.port", 7337)

// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
private val externalShuffleServicePort =
Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt

// Check that we're not using external shuffle service with consolidated shuffle files.
if (externalShuffleServiceEnabled
&& conf.getBoolean("spark.shuffle.consolidateFiles", false)
Expand All @@ -115,7 +120,8 @@ private[spark] class BlockManager(
// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTranserService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf))
new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager,
securityManager.isAuthenticationEnabled())
} else {
blockTransferService
}
Expand Down Expand Up @@ -166,9 +172,10 @@ private[spark] class BlockManager(
conf: SparkConf,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService) = {
blockTransferService: BlockTransferService,
securityManager: SecurityManager) = {
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
conf, mapOutputTracker, shuffleManager, blockTransferService)
conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
}

/**
Expand Down Expand Up @@ -219,7 +226,6 @@ private[spark] class BlockManager(
return
} catch {
case e: Exception if i < MAX_ATTEMPTS =>
val attemptsRemaining =
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}}"
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
Thread.sleep(SLEEP_TIME_SECS * 1000)
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/ToolTips.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ private[spark] object ToolTips {
scheduler delay is large, consider decreasing the size of tasks or decreasing the size
of task results."""

val TASK_DESERIALIZATION_TIME =
"""Time spent deserializating the task closure on the executor."""

val INPUT = "Bytes read from Hadoop or from Spark storage."

val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage."
Expand Down
31 changes: 30 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<span class="additional-metric-title">Scheduler Delay</span>
</span>
</li>
<li>
<span data-toggle="tooltip"
title={ToolTips.TASK_DESERIALIZATION_TIME} data-placement="right">
<input type="checkbox" name={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}/>
<span class="additional-metric-title">Task Deserialization Time</span>
</span>
</li>
<li>
<span data-toggle="tooltip"
title={ToolTips.GC_TIME} data-placement="right">
Expand Down Expand Up @@ -147,6 +154,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""),
("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""),
("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY),
("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
("GC Time", TaskDetailsClassNames.GC_TIME),
("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
Expand Down Expand Up @@ -179,6 +187,17 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
}
}

val deserializationTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.executorDeserializeTime.toDouble
}
val deserializationQuantiles =
<td>
<span data-toggle="tooltip" title={ToolTips.TASK_DESERIALIZATION_TIME}
data-placement="right">
Task Deserialization Time
</span>
</td> +: getFormattedTimeQuantiles(deserializationTimes)

val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.executorRunTime.toDouble
}
Expand Down Expand Up @@ -266,6 +285,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val listings: Seq[Seq[Node]] = Seq(
<tr>{serviceQuantiles}</tr>,
<tr class={TaskDetailsClassNames.SCHEDULER_DELAY}>{schedulerDelayQuantiles}</tr>,
<tr class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
{deserializationQuantiles}
</tr>
<tr class={TaskDetailsClassNames.GC_TIME}>{gcQuantiles}</tr>,
<tr class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
{serializationQuantiles}
Expand Down Expand Up @@ -314,6 +336,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
val schedulerDelay = metrics.map(getSchedulerDelay(info, _)).getOrElse(0L)
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
val gettingResultTime = info.gettingResultTime

Expand Down Expand Up @@ -367,6 +390,10 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
class={TaskDetailsClassNames.SCHEDULER_DELAY}>
{UIUtils.formatDuration(schedulerDelay.toLong)}
</td>
<td sorttable_customkey={taskDeserializationTime.toString}
class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
{UIUtils.formatDuration(taskDeserializationTime.toLong)}
</td>
<td sorttable_customkey={gcTime.toString} class={TaskDetailsClassNames.GC_TIME}>
{if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
</td>
Expand Down Expand Up @@ -424,6 +451,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
(info.finishTime - info.launchTime)
}
}
totalExecutionTime - metrics.executorRunTime
val executorOverhead = (metrics.executorDeserializeTime +
metrics.resultSerializationTime)
totalExecutionTime - metrics.executorRunTime - executorOverhead
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package org.apache.spark.ui.jobs
private object TaskDetailsClassNames {
val SCHEDULER_DELAY = "scheduler_delay"
val GC_TIME = "gc_time"
val TASK_DESERIALIZATION_TIME = "deserialization_time"
val RESULT_SERIALIZATION_TIME = "serialization_time"
val GETTING_RESULT_TIME = "getting_result_time"
}
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.json4s._
import tachyon.client.{TachyonFile,TachyonFS}

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}

/** CallSite represents a place in user code. It can have a short and a long form. */
Expand Down Expand Up @@ -1780,6 +1781,21 @@ private[spark] object Utils extends Logging {
val manifest = new JarManifest(manifestUrl.openStream())
manifest.getMainAttributes.getValue(Name.IMPLEMENTATION_VERSION)
}.getOrElse("Unknown")

/**
* Return the value of a config either through the SparkConf or the Hadoop configuration
* if this is Yarn mode. In the latter case, this defaults to the value set through SparkConf
* if the key is not set in the Hadoop configuration.
*/
def getSparkOrYarnConfig(conf: SparkConf, key: String, default: String): String = {
val sparkValue = conf.get(key, default)
if (SparkHadoopUtil.get.isYarnMode) {
SparkHadoopUtil.get.newConfiguration(conf).get(key, sparkValue)
} else {
sparkValue
}
}

}

/**
Expand Down
Loading

0 comments on commit f46e927

Please sign in to comment.