diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java b/core/src/main/java/org/apache/spark/JavaSparkListener.java
new file mode 100644
index 0000000000000..646496f313507
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/JavaSparkListener.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerApplicationEnd;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
+import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
+import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorAdded;
+import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
+import org.apache.spark.scheduler.SparkListenerTaskStart;
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
+
+/**
+ * Java clients should extend this class instead of implementing
+ * SparkListener directly. This is to prevent java clients
+ * from breaking when new events are added to the SparkListener
+ * trait.
+ *
+ * This is a concrete class instead of abstract to enforce
+ * new events get added to both the SparkListener and this adapter
+ * in lockstep.
+ */
+public class JavaSparkListener implements SparkListener {
+
+ @Override
+ public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }
+
+ @Override
+ public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { }
+
+ @Override
+ public void onTaskStart(SparkListenerTaskStart taskStart) { }
+
+ @Override
+ public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { }
+
+ @Override
+ public void onTaskEnd(SparkListenerTaskEnd taskEnd) { }
+
+ @Override
+ public void onJobStart(SparkListenerJobStart jobStart) { }
+
+ @Override
+ public void onJobEnd(SparkListenerJobEnd jobEnd) { }
+
+ @Override
+ public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { }
+
+ @Override
+ public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { }
+
+ @Override
+ public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { }
+
+ @Override
+ public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { }
+
+ @Override
+ public void onApplicationStart(SparkListenerApplicationStart applicationStart) { }
+
+ @Override
+ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { }
+
+ @Override
+ public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { }
+
+ @Override
+ public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { }
+
+ @Override
+ public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
+}
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 80da62c44edc5..a0c0372b7f0ef 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -44,7 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
- context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
+ val inputMetrics = blockResult.inputMetrics
+ val existingMetrics = context.taskMetrics
+ .getInputMetricsForReadMethod(inputMetrics.readMethod)
+ existingMetrics.addBytesRead(inputMetrics.bytesRead)
+
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
case None =>
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ff5d796ee2766..6a354ed4d1486 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -520,10 +520,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/** Distribute a local Scala collection to form an RDD.
*
- * @note Parallelize acts lazily. If `seq` is a mutable collection and is
- * altered after the call to parallelize and before the first action on the
- * RDD, the resultant RDD will reflect the modified collection. Pass a copy of
- * the argument to avoid this.
+ * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
+ * to parallelize and before the first action on the RDD, the resultant RDD will reflect the
+ * modified collection. Pass a copy of the argument to avoid this.
*/
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
index 2e1e52906ceeb..e5873ce724b9f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer
import org.apache.log4j.Level
-import org.apache.spark.util.MemoryParam
+import org.apache.spark.util.{IntParam, MemoryParam}
/**
* Command-line parser for the driver client.
@@ -51,8 +51,8 @@ private[spark] class ClientArguments(args: Array[String]) {
parse(args.toList)
def parse(args: List[String]): Unit = args match {
- case ("--cores" | "-c") :: value :: tail =>
- cores = value.toInt
+ case ("--cores" | "-c") :: IntParam(value) :: tail =>
+ cores = value
parse(tail)
case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 955cbd6dab96d..050ba91eb2bc3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -200,6 +200,7 @@ object SparkSubmit {
// Yarn cluster only
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
+ OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"),
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 47059b08a397f..81ec08cb6d501 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -108,6 +108,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
.orElse(sparkProperties.get("spark.driver.memory"))
.orElse(env.get("SPARK_DRIVER_MEMORY"))
.orNull
+ driverCores = Option(driverCores)
+ .orElse(sparkProperties.get("spark.driver.cores"))
+ .orNull
executorMemory = Option(executorMemory)
.orElse(sparkProperties.get("spark.executor.memory"))
.orElse(env.get("SPARK_EXECUTOR_MEMORY"))
@@ -406,6 +409,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| --total-executor-cores NUM Total cores for all executors.
|
| YARN-only:
+ | --driver-cores NUM Number of cores used by the driver, only in cluster mode
+ | (Default: 1).
| --executor-cores NUM Number of cores per executor (Default: 1).
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
| --num-executors NUM Number of executors to launch (Default: 2).
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index ad7d81747c377..ede0a9dbefb8d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -38,8 +38,8 @@ private[spark] class ApplicationInfo(
extends Serializable {
@transient var state: ApplicationState.Value = _
- @transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
- @transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _
+ @transient var executors: mutable.HashMap[Int, ExecutorDesc] = _
+ @transient var removedExecutors: ArrayBuffer[ExecutorDesc] = _
@transient var coresGranted: Int = _
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _
@@ -55,12 +55,12 @@ private[spark] class ApplicationInfo(
private def init() {
state = ApplicationState.WAITING
- executors = new mutable.HashMap[Int, ExecutorInfo]
+ executors = new mutable.HashMap[Int, ExecutorDesc]
coresGranted = 0
endTime = -1L
appSource = new ApplicationSource(this)
nextExecutorId = 0
- removedExecutors = new ArrayBuffer[ExecutorInfo]
+ removedExecutors = new ArrayBuffer[ExecutorDesc]
}
private def newExecutorId(useID: Option[Int] = None): Int = {
@@ -75,14 +75,14 @@ private[spark] class ApplicationInfo(
}
}
- def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = {
- val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
+ def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorDesc = {
+ val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
executors(exec.id) = exec
coresGranted += cores
exec
}
- def removeExecutor(exec: ExecutorInfo) {
+ def removeExecutor(exec: ExecutorDesc) {
if (executors.contains(exec.id)) {
removedExecutors += executors(exec.id)
executors -= exec.id
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala
similarity index 95%
rename from core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
rename to core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala
index d417070c51016..5d620dfcabad5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.master
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
-private[spark] class ExecutorInfo(
+private[spark] class ExecutorDesc(
val id: Int,
val application: ApplicationInfo,
val worker: WorkerInfo,
@@ -37,7 +37,7 @@ private[spark] class ExecutorInfo(
override def equals(other: Any): Boolean = {
other match {
- case info: ExecutorInfo =>
+ case info: ExecutorDesc =>
fullId == info.fullId &&
worker.id == info.worker.id &&
cores == info.cores &&
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 4b631ec639071..d92d99310a583 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -581,7 +581,7 @@ private[spark] class Master(
}
}
- def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
+ def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index 473ddc23ff0f3..e94aae93e4495 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -38,7 +38,7 @@ private[spark] class WorkerInfo(
Utils.checkHost(host, "Expected hostname")
assert (port > 0)
- @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // executorId => info
+ @transient var executors: mutable.HashMap[String, ExecutorDesc] = _ // executorId => info
@transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId => info
@transient var state: WorkerState.Value = _
@transient var coresUsed: Int = _
@@ -70,13 +70,13 @@ private[spark] class WorkerInfo(
host + ":" + port
}
- def addExecutor(exec: ExecutorInfo) {
+ def addExecutor(exec: ExecutorDesc) {
executors(exec.fullId) = exec
coresUsed += exec.cores
memoryUsed += exec.memory
}
- def removeExecutor(exec: ExecutorInfo) {
+ def removeExecutor(exec: ExecutorDesc) {
if (executors.contains(exec.fullId)) {
executors -= exec.fullId
coresUsed -= exec.cores
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 4588c130ef439..3aae2b95d7396 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -27,7 +27,7 @@ import org.json4s.JValue
import org.apache.spark.deploy.{ExecutorState, JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
-import org.apache.spark.deploy.master.ExecutorInfo
+import org.apache.spark.deploy.master.ExecutorDesc
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.util.Utils
@@ -109,7 +109,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
}
- private def executorRow(executor: ExecutorInfo): Seq[Node] = {
+ private def executorRow(executor: ExecutorDesc): Seq[Node] = {
{executor.id} |
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d481c0e62878a..42566d1a14093 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -203,10 +203,10 @@ private[spark] class Executor(
val afterSerialization = System.currentTimeMillis()
for (m <- task.metrics) {
- m.incExecutorDeserializeTime(taskStart - deserializeStartTime)
- m.incExecutorRunTime(taskFinish - taskStart)
- m.incJvmGCTime(gcTime - startGCTime)
- m.incResultSerializationTime(afterSerialization - beforeSerialization)
+ m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
+ m.setExecutorRunTime(taskFinish - taskStart)
+ m.setJvmGCTime(gcTime - startGCTime)
+ m.setResultSerializationTime(afterSerialization - beforeSerialization)
}
val accumUpdates = Accumulators.values
@@ -257,8 +257,8 @@ private[spark] class Executor(
val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
for (m <- metrics) {
- m.incExecutorRunTime(serviceTime)
- m.incJvmGCTime(gcTime - startGCTime)
+ m.setExecutorRunTime(serviceTime)
+ m.setJvmGCTime(gcTime - startGCTime)
}
val reason = new ExceptionFailure(t, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
@@ -376,10 +376,12 @@ private[spark] class Executor(
val curGCTime = gcTime
for (taskRunner <- runningTasks.values()) {
- if (!taskRunner.attemptedTask.isEmpty) {
+ if (taskRunner.attemptedTask.nonEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
- metrics.updateShuffleReadMetrics
- metrics.incJvmGCTime(curGCTime - taskRunner.startGCTime)
+ metrics.updateShuffleReadMetrics()
+ metrics.updateInputMetrics()
+ metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
+
if (isLocal) {
// JobProgressListener will hold an reference of it during
// onExecutorMetricsUpdate(), then JobProgressListener can not see
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index dcef28888c215..571e684b66a31 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,6 +17,11 @@
package org.apache.spark.executor
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.executor.DataReadMethod
+import org.apache.spark.executor.DataReadMethod.DataReadMethod
+
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.annotation.DeveloperApi
@@ -48,8 +53,7 @@ class TaskMetrics extends Serializable {
*/
private var _executorDeserializeTime: Long = _
def executorDeserializeTime = _executorDeserializeTime
- private[spark] def incExecutorDeserializeTime(value: Long) = _executorDeserializeTime += value
- private[spark] def decExecutorDeserializeTime(value: Long) = _executorDeserializeTime -= value
+ private[spark] def setExecutorDeserializeTime(value: Long) = _executorDeserializeTime = value
/**
@@ -57,16 +61,14 @@ class TaskMetrics extends Serializable {
*/
private var _executorRunTime: Long = _
def executorRunTime = _executorRunTime
- private[spark] def incExecutorRunTime(value: Long) = _executorRunTime += value
- private[spark] def decExecutorRunTime(value: Long) = _executorRunTime -= value
+ private[spark] def setExecutorRunTime(value: Long) = _executorRunTime = value
/**
* The number of bytes this task transmitted back to the driver as the TaskResult
*/
private var _resultSize: Long = _
def resultSize = _resultSize
- private[spark] def incResultSize(value: Long) = _resultSize += value
- private[spark] def decResultSize(value: Long) = _resultSize -= value
+ private[spark] def setResultSize(value: Long) = _resultSize = value
/**
@@ -74,16 +76,14 @@ class TaskMetrics extends Serializable {
*/
private var _jvmGCTime: Long = _
def jvmGCTime = _jvmGCTime
- private[spark] def incJvmGCTime(value: Long) = _jvmGCTime += value
- private[spark] def decJvmGCTime(value: Long) = _jvmGCTime -= value
+ private[spark] def setJvmGCTime(value: Long) = _jvmGCTime = value
/**
* Amount of time spent serializing the task result
*/
private var _resultSerializationTime: Long = _
def resultSerializationTime = _resultSerializationTime
- private[spark] def incResultSerializationTime(value: Long) = _resultSerializationTime += value
- private[spark] def decResultSerializationTime(value: Long) = _resultSerializationTime -= value
+ private[spark] def setResultSerializationTime(value: Long) = _resultSerializationTime = value
/**
* The number of in-memory bytes spilled by this task
@@ -105,7 +105,17 @@ class TaskMetrics extends Serializable {
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
* are stored here.
*/
- var inputMetrics: Option[InputMetrics] = None
+ private var _inputMetrics: Option[InputMetrics] = None
+
+ def inputMetrics = _inputMetrics
+
+ /**
+ * This should only be used when recreating TaskMetrics, not when updating input metrics in
+ * executors
+ */
+ private[spark] def setInputMetrics(inputMetrics: Option[InputMetrics]) {
+ _inputMetrics = inputMetrics
+ }
/**
* If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
@@ -158,6 +168,30 @@ class TaskMetrics extends Serializable {
readMetrics
}
+ /**
+ * Returns the input metrics object that the task should use. Currently, if
+ * there exists an input metric with the same readMethod, we return that one
+ * so the caller can accumulate bytes read. If the readMethod is different
+ * than previously seen by this task, we return a new InputMetric but don't
+ * record it.
+ *
+ * Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed,
+ * we can store all the different inputMetrics (one per readMethod).
+ */
+ private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod):
+ InputMetrics =synchronized {
+ _inputMetrics match {
+ case None =>
+ val metrics = new InputMetrics(readMethod)
+ _inputMetrics = Some(metrics)
+ metrics
+ case Some(metrics @ InputMetrics(method)) if method == readMethod =>
+ metrics
+ case Some(InputMetrics(method)) =>
+ new InputMetrics(readMethod)
+ }
+ }
+
/**
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
*/
@@ -171,6 +205,10 @@ class TaskMetrics extends Serializable {
}
_shuffleReadMetrics = Some(merged)
}
+
+ private[spark] def updateInputMetrics() = synchronized {
+ inputMetrics.foreach(_.updateBytesRead())
+ }
}
private[spark] object TaskMetrics {
@@ -204,13 +242,38 @@ object DataWriteMethod extends Enumeration with Serializable {
*/
@DeveloperApi
case class InputMetrics(readMethod: DataReadMethod.Value) {
+
+ private val _bytesRead: AtomicLong = new AtomicLong()
+
/**
* Total bytes read.
*/
- private var _bytesRead: Long = _
- def bytesRead = _bytesRead
- private[spark] def incBytesRead(value: Long) = _bytesRead += value
- private[spark] def decBytesRead(value: Long) = _bytesRead -= value
+ def bytesRead: Long = _bytesRead.get()
+ @volatile @transient var bytesReadCallback: Option[() => Long] = None
+
+ /**
+ * Adds additional bytes read for this read method.
+ */
+ def addBytesRead(bytes: Long) = {
+ _bytesRead.addAndGet(bytes)
+ }
+
+ /**
+ * Invoke the bytesReadCallback and mutate bytesRead.
+ */
+ def updateBytesRead() {
+ bytesReadCallback.foreach { c =>
+ _bytesRead.set(c())
+ }
+ }
+
+ /**
+ * Register a function that can be called to get up-to-date information on how many bytes the task
+ * has read from an input source.
+ */
+ def setBytesReadCallback(f: Option[() => Long]) {
+ bytesReadCallback = f
+ }
}
/**
@@ -224,8 +287,7 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
*/
private var _bytesWritten: Long = _
def bytesWritten = _bytesWritten
- private[spark] def incBytesWritten(value : Long) = _bytesWritten += value
- private[spark] def decBytesWritten(value : Long) = _bytesWritten -= value
+ private[spark] def setBytesWritten(value : Long) = _bytesWritten = value
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index ff4be037c8c9b..056aef0bc210a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -213,18 +213,19 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()
- val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ val inputMetrics = context.taskMetrics
+ .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
- val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
- split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
- } else {
- None
- }
- if (bytesReadCallback.isDefined) {
- context.taskMetrics.inputMetrics = Some(inputMetrics)
- }
+ val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
+ split.inputSplit.value match {
+ case split: FileSplit =>
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf)
+ case _ => None
+ }
+ )
+ inputMetrics.setBytesReadCallback(bytesReadCallback)
var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
@@ -237,8 +238,6 @@ class HadoopRDD[K, V](
val key: K = reader.createKey()
val value: V = reader.createValue()
- var recordsSinceMetricsUpdate = 0
-
override def getNext() = {
try {
finished = !reader.next(key, value)
@@ -247,15 +246,6 @@ class HadoopRDD[K, V](
finished = true
}
- // Update bytes read metric every few records
- if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
- && bytesReadCallback.isDefined) {
- recordsSinceMetricsUpdate = 0
- val bytesReadFn = bytesReadCallback.get
- inputMetrics.incBytesRead(bytesReadFn())
- } else {
- recordsSinceMetricsUpdate += 1
- }
(key, value)
}
@@ -263,14 +253,12 @@ class HadoopRDD[K, V](
try {
reader.close()
if (bytesReadCallback.isDefined) {
- val bytesReadFn = bytesReadCallback.get
- inputMetrics.incBytesRead(bytesReadFn())
+ inputMetrics.updateBytesRead()
} else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
- inputMetrics.incBytesRead(split.inputSplit.value.getLength)
- context.taskMetrics.inputMetrics = Some(inputMetrics)
+ inputMetrics.addBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 3a8ac80b005a4..7b0e3c87ccff4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -109,18 +109,19 @@ class NewHadoopRDD[K, V](
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value
- val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ val inputMetrics = context.taskMetrics
+ .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
- val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
- split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf)
- } else {
- None
- }
- if (bytesReadCallback.isDefined) {
- context.taskMetrics.inputMetrics = Some(inputMetrics)
- }
+ val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
+ split.serializableHadoopSplit.value match {
+ case split: FileSplit =>
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, conf)
+ case _ => None
+ }
+ )
+ inputMetrics.setBytesReadCallback(bytesReadCallback)
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
@@ -154,33 +155,19 @@ class NewHadoopRDD[K, V](
}
havePair = false
- // Update bytes read metric every few records
- if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
- && bytesReadCallback.isDefined) {
- recordsSinceMetricsUpdate = 0
- val bytesReadFn = bytesReadCallback.get
- inputMetrics.incBytesRead(bytesReadFn())
- } else {
- recordsSinceMetricsUpdate += 1
- }
-
(reader.getCurrentKey, reader.getCurrentValue)
}
private def close() {
try {
reader.close()
-
- // Update metrics with final amount
if (bytesReadCallback.isDefined) {
- val bytesReadFn = bytesReadCallback.get
- inputMetrics.incBytesRead(bytesReadFn())
+ inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
- inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
- context.taskMetrics.inputMetrics = Some(inputMetrics)
+ inputMetrics.addBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index e8163fbc0c052..0f37d830ef34f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -1007,7 +1007,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
- bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) }
+ bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
1
} : Int
@@ -1079,7 +1079,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.close()
}
writer.commit()
- bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) }
+ bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
}
self.context.runJob(self, writeToFile)
@@ -1102,7 +1102,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
&& bytesWrittenCallback.isDefined) {
- bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) }
+ bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 87b22de6ae697..f12d0cffaba34 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -111,7 +111,8 @@ private object ParallelCollectionRDD {
/**
* Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
* collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
- * it efficient to run Spark over RDDs representing large sets of numbers.
+ * it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
+ * is an inclusive Range, we use inclusive range for the last slice.
*/
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
@@ -127,19 +128,15 @@ private object ParallelCollectionRDD {
})
}
seq match {
- case r: Range.Inclusive => {
- val sign = if (r.step < 0) {
- -1
- } else {
- 1
- }
- slice(new Range(
- r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
- }
case r: Range => {
- positions(r.length, numSlices).map({
- case (start, end) =>
+ positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>
+ // If the range is inclusive, use inclusive range for the last slice
+ if (r.isInclusive && index == numSlices - 1) {
+ new Range.Inclusive(r.start + start * r.step, r.end, r.step)
+ }
+ else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
+ }
}).toSeq.asInstanceOf[Seq[Seq[T]]]
}
case nr: NumericRange[_] => {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8cb15918baa8c..3bca59e0646d0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -661,7 +661,7 @@ class DAGScheduler(
// completion events or stage abort
stageIdToStage -= s.id
jobIdToStageIds -= job.jobId
- listenerBus.post(SparkListenerJobEnd(job.jobId, jobResult))
+ listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), jobResult))
}
}
@@ -710,7 +710,7 @@ class DAGScheduler(
stage.latestInfo.stageFailed(stageFailedMessage)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
}
- listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+ listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
}
}
@@ -749,9 +749,11 @@ class DAGScheduler(
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val shouldRunLocally =
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
+ val jobSubmissionTime = clock.getTime()
if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
- listenerBus.post(SparkListenerJobStart(job.jobId, Seq.empty, properties))
+ listenerBus.post(
+ SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
runLocally(job)
} else {
jobIdToActiveJob(jobId) = job
@@ -759,7 +761,8 @@ class DAGScheduler(
finalStage.resultOfJob = Some(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
- listenerBus.post(SparkListenerJobStart(job.jobId, stageInfos, properties))
+ listenerBus.post(
+ SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}
}
@@ -965,7 +968,8 @@ class DAGScheduler(
if (job.numFinished == job.numPartitions) {
markStageAsFinished(stage)
cleanupStateForJobAndIndependentStages(job)
- listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
+ listenerBus.post(
+ SparkListenerJobEnd(job.jobId, clock.getTime(), JobSucceeded))
}
// taskSucceeded runs some user code that might throw an exception. Make sure
@@ -1234,7 +1238,7 @@ class DAGScheduler(
if (ableToCancelStages) {
job.listener.jobFailed(error)
cleanupStateForJobAndIndependentStages(job)
- listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+ listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 27bf4f1599076..30075c172bdb1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -168,6 +168,10 @@ private[spark] class EventLoggingListener(
logEvent(event, flushLogger = true)
override def onApplicationEnd(event: SparkListenerApplicationEnd) =
logEvent(event, flushLogger = true)
+ override def onExecutorAdded(event: SparkListenerExecutorAdded) =
+ logEvent(event, flushLogger = true)
+ override def onExecutorRemoved(event: SparkListenerExecutorRemoved) =
+ logEvent(event, flushLogger = true)
// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate) { }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index b62b0c1312693..e5d1eb767e109 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable
import org.apache.spark.{Logging, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{Distribution, Utils}
@@ -58,6 +59,7 @@ case class SparkListenerTaskEnd(
@DeveloperApi
case class SparkListenerJobStart(
jobId: Int,
+ time: Long,
stageInfos: Seq[StageInfo],
properties: Properties = null)
extends SparkListenerEvent {
@@ -67,7 +69,11 @@ case class SparkListenerJobStart(
}
@DeveloperApi
-case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkListenerEvent
+case class SparkListenerJobEnd(
+ jobId: Int,
+ time: Long,
+ jobResult: JobResult)
+ extends SparkListenerEvent
@DeveloperApi
case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]])
@@ -84,6 +90,14 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan
@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
+@DeveloperApi
+case class SparkListenerExecutorAdded(executorId: String, executorInfo: ExecutorInfo)
+ extends SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerExecutorRemoved(executorId: String)
+ extends SparkListenerEvent
+
/**
* Periodic updates from executors.
* @param execId executor id
@@ -109,7 +123,8 @@ private[spark] case object SparkListenerShutdown extends SparkListenerEvent
/**
* :: DeveloperApi ::
* Interface for listening to events from the Spark scheduler. Note that this is an internal
- * interface which might change in different Spark releases.
+ * interface which might change in different Spark releases. Java clients should extend
+ * {@link JavaSparkListener}
*/
@DeveloperApi
trait SparkListener {
@@ -183,6 +198,16 @@ trait SparkListener {
* Called when the driver receives task metrics from an executor in a heartbeat.
*/
def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { }
+
+ /**
+ * Called when the driver registers a new executor.
+ */
+ def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) { }
+
+ /**
+ * Called when the driver removes an executor.
+ */
+ def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved) { }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index e79ffd7a3587d..e700c6af542f4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -70,6 +70,10 @@ private[spark] trait SparkListenerBus extends Logging {
foreachListener(_.onApplicationEnd(applicationEnd))
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))
+ case executorAdded: SparkListenerExecutorAdded =>
+ foreachListener(_.onExecutorAdded(executorAdded))
+ case executorRemoved: SparkListenerExecutorRemoved =>
+ foreachListener(_.onExecutorRemoved(executorRemoved))
case SparkListenerShutdown =>
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 9fa5a09cc29d7..774f3d8cdb275 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -77,7 +77,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
(deserializedResult, size)
}
- result.metrics.incResultSize(size)
+ result.metrics.setResultSize(size)
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
} catch {
case cnf: ClassNotFoundException =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index fe9914b50bc54..5786d367464f4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -28,7 +28,7 @@ import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
-import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
@@ -66,6 +66,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
// Number of executors requested from the cluster manager that have not registered yet
private var numPendingExecutors = 0
+ private val listenerBus = scheduler.sc.listenerBus
+
// Executors we have requested the cluster manager to kill that have not died yet
private val executorsPendingToRemove = new HashSet[String]
@@ -106,6 +108,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
+ listenerBus.post(SparkListenerExecutorAdded(executorId, data))
makeOffers()
}
@@ -213,6 +216,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
scheduler.executorLost(executorId, SlaveLost(reason))
+ listenerBus.post(SparkListenerExecutorRemoved(executorId))
case None => logError(s"Asked to remove non-existent executor $executorId")
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
index b71bd5783d6df..eb52ddfb1eab1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -31,7 +31,7 @@ import akka.actor.{Address, ActorRef}
private[cluster] class ExecutorData(
val executorActor: ActorRef,
val executorAddress: Address,
- val executorHost: String ,
+ override val executorHost: String,
var freeCores: Int,
- val totalCores: Int
-)
+ override val totalCores: Int
+) extends ExecutorInfo(executorHost, totalCores)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
new file mode 100644
index 0000000000000..b4738e64c9391
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Stores information about an executor to pass from the scheduler to SparkListeners.
+ */
+@DeveloperApi
+class ExecutorInfo(
+ val executorHost: String,
+ val totalCores: Int
+) {
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: ExecutorInfo =>
+ (that canEqual this) &&
+ executorHost == that.executorHost &&
+ totalCores == that.totalCores
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ val state = Seq(executorHost, totalCores)
+ state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 75d8ddf375e27..d252fe8595fb8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -27,9 +27,11 @@ import scala.collection.mutable.{HashMap, HashSet}
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState,
+ ExecutorInfo => MesosExecutorInfo, _}
import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
+import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
@@ -62,6 +64,9 @@ private[spark] class MesosSchedulerBackend(
var classLoader: ClassLoader = null
+ // The listener bus to publish executor added/removed events.
+ val listenerBus = sc.listenerBus
+
@volatile var appId: String = _
override def start() {
@@ -87,7 +92,7 @@ private[spark] class MesosSchedulerBackend(
}
}
- def createExecutorInfo(execId: String): ExecutorInfo = {
+ def createExecutorInfo(execId: String): MesosExecutorInfo = {
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
.getOrElse {
@@ -141,7 +146,7 @@ private[spark] class MesosSchedulerBackend(
Value.Scalar.newBuilder()
.setValue(MemoryUtils.calculateTotalMemory(sc)).build())
.build()
- ExecutorInfo.newBuilder()
+ MesosExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
@@ -237,6 +242,7 @@ private[spark] class MesosSchedulerBackend(
}
val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
+ val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
@@ -260,6 +266,10 @@ private[spark] class MesosSchedulerBackend(
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
mesosTasks.foreach { case (slaveId, tasks) =>
+ slaveIdToWorkerOffer.get(slaveId).foreach(o =>
+ listenerBus.post(SparkListenerExecutorAdded(slaveId,
+ new ExecutorInfo(o.host, o.cores)))
+ )
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}
@@ -315,7 +325,7 @@ private[spark] class MesosSchedulerBackend(
synchronized {
if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
// We lost the executor on this slave, so remember that it's gone
- slaveIdsWithExecutors -= taskIdToSlaveId(tid)
+ removeExecutor(taskIdToSlaveId(tid))
}
if (isFinished(status.getState)) {
taskIdToSlaveId.remove(tid)
@@ -344,12 +354,20 @@ private[spark] class MesosSchedulerBackend(
override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
+ /**
+ * Remove executor associated with slaveId in a thread safe manner.
+ */
+ private def removeExecutor(slaveId: String) = {
+ synchronized {
+ listenerBus.post(SparkListenerExecutorRemoved(slaveId))
+ slaveIdsWithExecutors -= slaveId
+ }
+ }
+
private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
inClassLoader() {
logInfo("Mesos slave lost: " + slaveId.getValue)
- synchronized {
- slaveIdsWithExecutors -= slaveId.getValue
- }
+ removeExecutor(slaveId.getValue)
scheduler.executorLost(slaveId.getValue, reason)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 26a47e8bd0ff8..8bc5a1cd18b64 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -34,10 +34,9 @@ import org.apache.spark.executor._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
-import org.apache.spark.network.netty.{SparkTransportConf, NettyBlockTransferService}
+import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.ExternalShuffleClient
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
-import org.apache.spark.network.util.{ConfigProvider, TransportConf}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.shuffle.hash.HashShuffleManager
@@ -54,7 +53,7 @@ private[spark] class BlockResult(
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
- inputMetrics.incBytesRead(bytes)
+ inputMetrics.addBytesRead(bytes)
}
/**
@@ -120,7 +119,7 @@ private[spark] class BlockManager(
private[spark] var shuffleServerId: BlockManagerId = _
// Client to read other executors' shuffle files. This is either an external service, or just the
- // standard BlockTranserService to directly connect to other Executors.
+ // standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled())
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index 1d1c701878447..81212708ba524 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -21,7 +21,6 @@ import scala.xml.{Node, NodeSeq}
import javax.servlet.http.HttpServletRequest
-import org.apache.spark.JobExecutionStatus
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.ui.jobs.UIData.JobUIData
@@ -51,13 +50,13 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
val duration: Option[Long] = {
- job.startTime.map { start =>
- val end = job.endTime.getOrElse(System.currentTimeMillis())
+ job.submissionTime.map { start =>
+ val end = job.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
}
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
- val formattedSubmissionTime = job.startTime.map(UIUtils.formatDate).getOrElse("Unknown")
+ val formattedSubmissionTime = job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
val detailUrl =
"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)
|
@@ -68,7 +67,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
{lastStageDescription}
{lastStageName}
-
+ |
{formattedSubmissionTime}
|
{formattedDuration} |
@@ -101,11 +100,11 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
val now = System.currentTimeMillis
val activeJobsTable =
- jobsTable(activeJobs.sortBy(_.startTime.getOrElse(-1L)).reverse)
+ jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse)
val completedJobsTable =
- jobsTable(completedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
+ jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
val failedJobsTable =
- jobsTable(failedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
+ jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
val shouldShowActiveJobs = activeJobs.nonEmpty
val shouldShowCompletedJobs = completedJobs.nonEmpty
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 72935beb3a34a..b0d3bed1300b3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -153,14 +153,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val jobData: JobUIData =
new JobUIData(
jobId = jobStart.jobId,
- startTime = Some(System.currentTimeMillis),
- endTime = None,
+ submissionTime = Option(jobStart.time).filter(_ >= 0),
stageIds = jobStart.stageIds,
jobGroup = jobGroup,
status = JobExecutionStatus.RUNNING)
// Compute (a potential underestimate of) the number of tasks that will be run by this job.
// This may be an underestimate because the job start event references all of the result
- // stages's transitive stage dependencies, but some of these stages might be skipped if their
+ // stages' transitive stage dependencies, but some of these stages might be skipped if their
// output is available from earlier runs.
// See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
jobData.numTasks = {
@@ -186,7 +185,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
new JobUIData(jobId = jobEnd.jobId)
}
- jobData.endTime = Some(System.currentTimeMillis())
+ jobData.completionTime = Option(jobEnd.time).filter(_ >= 0)
+
jobEnd.jobResult match {
case JobSucceeded =>
completedJobs += jobData
@@ -309,7 +309,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val info = taskEnd.taskInfo
// If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task
- // compeletion event is for. Let's just drop it here. This means we might have some speculation
+ // completion event is for. Let's just drop it here. This means we might have some speculation
// tasks on the web ui that's never marked as complete.
if (info != null && taskEnd.stageAttemptId != -1) {
val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 48fd7caa1a1ed..01f7e23212c3d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -40,15 +40,15 @@ private[jobs] object UIData {
class JobUIData(
var jobId: Int = -1,
- var startTime: Option[Long] = None,
- var endTime: Option[Long] = None,
+ var submissionTime: Option[Long] = None,
+ var completionTime: Option[Long] = None,
var stageIds: Seq[Int] = Seq.empty,
var jobGroup: Option[String] = None,
var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN,
/* Tasks */
// `numTasks` is a potential underestimate of the true number of tasks that this job will run.
// This may be an underestimate because the job start event references all of the result
- // stages's transitive stage dependencies, but some of these stages might be skipped if their
+ // stages' transitive stage dependencies, but some of these stages might be skipped if their
// output is available from earlier runs.
// See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
var numTasks: Int = 0,
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 68a46bbcb7070..f896b5072e4fa 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -19,6 +19,8 @@ package org.apache.spark.util
import java.util.{Properties, UUID}
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+
import scala.collection.JavaConverters._
import scala.collection.Map
@@ -30,6 +32,7 @@ import org.apache.spark.executor._
import org.apache.spark.scheduler._
import org.apache.spark.storage._
import org.apache.spark._
+import org.apache.hadoop.hdfs.web.JsonUtil
/**
* Serializes SparkListener events to/from JSON. This protocol provides strong backwards-
@@ -83,7 +86,10 @@ private[spark] object JsonProtocol {
applicationStartToJson(applicationStart)
case applicationEnd: SparkListenerApplicationEnd =>
applicationEndToJson(applicationEnd)
-
+ case executorAdded: SparkListenerExecutorAdded =>
+ executorAddedToJson(executorAdded)
+ case executorRemoved: SparkListenerExecutorRemoved =>
+ executorRemovedToJson(executorRemoved)
// These aren't used, but keeps compiler happy
case SparkListenerShutdown => JNothing
case SparkListenerExecutorMetricsUpdate(_, _) => JNothing
@@ -136,6 +142,7 @@ private[spark] object JsonProtocol {
val properties = propertiesToJson(jobStart.properties)
("Event" -> Utils.getFormattedClassName(jobStart)) ~
("Job ID" -> jobStart.jobId) ~
+ ("Submission Time" -> jobStart.time) ~
("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in Spark 1.2.0
("Stage IDs" -> jobStart.stageIds) ~
("Properties" -> properties)
@@ -145,6 +152,7 @@ private[spark] object JsonProtocol {
val jobResult = jobResultToJson(jobEnd.jobResult)
("Event" -> Utils.getFormattedClassName(jobEnd)) ~
("Job ID" -> jobEnd.jobId) ~
+ ("Completion Time" -> jobEnd.time) ~
("Job Result" -> jobResult)
}
@@ -194,6 +202,16 @@ private[spark] object JsonProtocol {
("Timestamp" -> applicationEnd.time)
}
+ def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = {
+ ("Event" -> Utils.getFormattedClassName(executorAdded)) ~
+ ("Executor ID" -> executorAdded.executorId) ~
+ ("Executor Info" -> executorInfoToJson(executorAdded.executorInfo))
+ }
+
+ def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = {
+ ("Event" -> Utils.getFormattedClassName(executorRemoved)) ~
+ ("Executor ID" -> executorRemoved.executorId)
+ }
/** ------------------------------------------------------------------- *
* JSON serialization methods for classes SparkListenerEvents depend on |
@@ -362,6 +380,10 @@ private[spark] object JsonProtocol {
("Disk Size" -> blockStatus.diskSize)
}
+ def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
+ ("Host" -> executorInfo.executorHost) ~
+ ("Total Cores" -> executorInfo.totalCores)
+ }
/** ------------------------------ *
* Util JSON serialization methods |
@@ -416,6 +438,8 @@ private[spark] object JsonProtocol {
val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD)
val applicationStart = Utils.getFormattedClassName(SparkListenerApplicationStart)
val applicationEnd = Utils.getFormattedClassName(SparkListenerApplicationEnd)
+ val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded)
+ val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
(json \ "Event").extract[String] match {
case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -431,6 +455,8 @@ private[spark] object JsonProtocol {
case `unpersistRDD` => unpersistRDDFromJson(json)
case `applicationStart` => applicationStartFromJson(json)
case `applicationEnd` => applicationEndFromJson(json)
+ case `executorAdded` => executorAddedFromJson(json)
+ case `executorRemoved` => executorRemovedFromJson(json)
}
}
@@ -469,6 +495,8 @@ private[spark] object JsonProtocol {
def jobStartFromJson(json: JValue): SparkListenerJobStart = {
val jobId = (json \ "Job ID").extract[Int]
+ val submissionTime =
+ Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]).getOrElse(-1L)
val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int])
val properties = propertiesFromJson(json \ "Properties")
// The "Stage Infos" field was added in Spark 1.2.0
@@ -476,13 +504,15 @@ private[spark] object JsonProtocol {
.map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
}
- SparkListenerJobStart(jobId, stageInfos, properties)
+ SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
}
def jobEndFromJson(json: JValue): SparkListenerJobEnd = {
val jobId = (json \ "Job ID").extract[Int]
+ val completionTime =
+ Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]).getOrElse(-1L)
val jobResult = jobResultFromJson(json \ "Job Result")
- SparkListenerJobEnd(jobId, jobResult)
+ SparkListenerJobEnd(jobId, completionTime, jobResult)
}
def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = {
@@ -523,6 +553,16 @@ private[spark] object JsonProtocol {
SparkListenerApplicationEnd((json \ "Timestamp").extract[Long])
}
+ def executorAddedFromJson(json: JValue): SparkListenerExecutorAdded = {
+ val executorId = (json \ "Executor ID").extract[String]
+ val executorInfo = executorInfoFromJson(json \ "Executor Info")
+ SparkListenerExecutorAdded(executorId, executorInfo)
+ }
+
+ def executorRemovedFromJson(json: JValue): SparkListenerExecutorRemoved = {
+ val executorId = (json \ "Executor ID").extract[String]
+ SparkListenerExecutorRemoved(executorId)
+ }
/** --------------------------------------------------------------------- *
* JSON deserialization methods for classes SparkListenerEvents depend on |
@@ -593,19 +633,19 @@ private[spark] object JsonProtocol {
}
val metrics = new TaskMetrics
metrics.setHostname((json \ "Host Name").extract[String])
- metrics.incExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long])
- metrics.incExecutorRunTime((json \ "Executor Run Time").extract[Long])
- metrics.incResultSize((json \ "Result Size").extract[Long])
- metrics.incJvmGCTime((json \ "JVM GC Time").extract[Long])
- metrics.incResultSerializationTime((json \ "Result Serialization Time").extract[Long])
+ metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long])
+ metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long])
+ metrics.setResultSize((json \ "Result Size").extract[Long])
+ metrics.setJvmGCTime((json \ "JVM GC Time").extract[Long])
+ metrics.setResultSerializationTime((json \ "Result Serialization Time").extract[Long])
metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long])
metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long])
metrics.setShuffleReadMetrics(
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson))
metrics.shuffleWriteMetrics =
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
- metrics.inputMetrics =
- Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)
+ metrics.setInputMetrics(
+ Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson))
metrics.outputMetrics =
Utils.jsonOption(json \ "Output Metrics").map(outputMetricsFromJson)
metrics.updatedBlocks =
@@ -638,14 +678,14 @@ private[spark] object JsonProtocol {
def inputMetricsFromJson(json: JValue): InputMetrics = {
val metrics = new InputMetrics(
DataReadMethod.withName((json \ "Data Read Method").extract[String]))
- metrics.incBytesRead((json \ "Bytes Read").extract[Long])
+ metrics.addBytesRead((json \ "Bytes Read").extract[Long])
metrics
}
def outputMetricsFromJson(json: JValue): OutputMetrics = {
val metrics = new OutputMetrics(
DataWriteMethod.withName((json \ "Data Write Method").extract[String]))
- metrics.incBytesWritten((json \ "Bytes Written").extract[Long])
+ metrics.setBytesWritten((json \ "Bytes Written").extract[Long])
metrics
}
@@ -745,6 +785,11 @@ private[spark] object JsonProtocol {
BlockStatus(storageLevel, memorySize, diskSize, tachyonSize)
}
+ def executorInfoFromJson(json: JValue): ExecutorInfo = {
+ val executorHost = (json \ "Host").extract[String]
+ val totalCores = (json \ "Total Cores").extract[Int]
+ new ExecutorInfo(executorHost, totalCores)
+ }
/** -------------------------------- *
* Util JSON deserialization methods |
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index f8bcde12a371a..10a39990f80ce 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -17,66 +17,185 @@
package org.apache.spark.metrics
-import java.io.{FileWriter, PrintWriter, File}
+import java.io.{File, FileWriter, PrintWriter}
-import org.apache.spark.SharedSparkContext
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener}
+import scala.collection.mutable.ArrayBuffer
import org.scalatest.FunSuite
-import org.scalatest.Matchers
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
-import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.SharedSparkContext
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
+import org.apache.spark.util.Utils
+
+class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
-class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Matchers {
- test("input metrics when reading text file with single split") {
- val file = new File(getClass.getSimpleName + ".txt")
- val pw = new PrintWriter(new FileWriter(file))
- pw.println("some stuff")
- pw.println("some other stuff")
- pw.println("yet more stuff")
- pw.println("too much stuff")
+ @transient var tmpDir: File = _
+ @transient var tmpFile: File = _
+ @transient var tmpFilePath: String = _
+
+ override def beforeAll() {
+ super.beforeAll()
+
+ tmpDir = Utils.createTempDir()
+ val testTempDir = new File(tmpDir, "test")
+ testTempDir.mkdir()
+
+ tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt")
+ val pw = new PrintWriter(new FileWriter(tmpFile))
+ for (x <- 1 to 1000000) {
+ pw.println("s")
+ }
pw.close()
- file.deleteOnExit()
- val taskBytesRead = new ArrayBuffer[Long]()
- sc.addSparkListener(new SparkListener() {
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
- }
- })
- sc.textFile("file://" + file.getAbsolutePath, 2).count()
+ // Path to tmpFile
+ tmpFilePath = "file://" + tmpFile.getAbsolutePath
+ }
- // Wait for task end events to come in
- sc.listenerBus.waitUntilEmpty(500)
- assert(taskBytesRead.length == 2)
- assert(taskBytesRead.sum >= file.length())
+ override def afterAll() {
+ super.afterAll()
+ Utils.deleteRecursively(tmpDir)
}
- test("input metrics when reading text file with multiple splits") {
- val file = new File(getClass.getSimpleName + ".txt")
- val pw = new PrintWriter(new FileWriter(file))
- for (i <- 0 until 10000) {
- pw.println("some stuff")
+ test("input metrics for old hadoop with coalesce") {
+ val bytesRead = runAndReturnBytesRead {
+ sc.textFile(tmpFilePath, 4).count()
+ }
+ val bytesRead2 = runAndReturnBytesRead {
+ sc.textFile(tmpFilePath, 4).coalesce(2).count()
+ }
+ assert(bytesRead != 0)
+ assert(bytesRead == bytesRead2)
+ assert(bytesRead2 >= tmpFile.length())
+ }
+
+ test("input metrics with cache and coalesce") {
+ // prime the cache manager
+ val rdd = sc.textFile(tmpFilePath, 4).cache()
+ rdd.collect()
+
+ val bytesRead = runAndReturnBytesRead {
+ rdd.count()
+ }
+ val bytesRead2 = runAndReturnBytesRead {
+ rdd.coalesce(4).count()
}
- pw.close()
- file.deleteOnExit()
+ // for count and coelesce, the same bytes should be read.
+ assert(bytesRead != 0)
+ assert(bytesRead2 == bytesRead)
+ }
+
+ /**
+ * This checks the situation where we have interleaved reads from
+ * different sources. Currently, we only accumulate fron the first
+ * read method we find in the task. This test uses cartesian to create
+ * the interleaved reads.
+ *
+ * Once https://issues.apache.org/jira/browse/SPARK-5225 is fixed
+ * this test should break.
+ */
+ test("input metrics with mixed read method") {
+ // prime the cache manager
+ val numPartitions = 2
+ val rdd = sc.parallelize(1 to 100, numPartitions).cache()
+ rdd.collect()
+
+ val rdd2 = sc.textFile(tmpFilePath, numPartitions)
+
+ val bytesRead = runAndReturnBytesRead {
+ rdd.count()
+ }
+ val bytesRead2 = runAndReturnBytesRead {
+ rdd2.count()
+ }
+
+ val cartRead = runAndReturnBytesRead {
+ rdd.cartesian(rdd2).count()
+ }
+
+ assert(cartRead != 0)
+ assert(bytesRead != 0)
+ // We read from the first rdd of the cartesian once per partition.
+ assert(cartRead == bytesRead * numPartitions)
+ }
+
+ test("input metrics for new Hadoop API with coalesce") {
+ val bytesRead = runAndReturnBytesRead {
+ sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable],
+ classOf[Text]).count()
+ }
+ val bytesRead2 = runAndReturnBytesRead {
+ sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable],
+ classOf[Text]).coalesce(5).count()
+ }
+ assert(bytesRead != 0)
+ assert(bytesRead2 == bytesRead)
+ assert(bytesRead >= tmpFile.length())
+ }
+
+ test("input metrics when reading text file") {
+ val bytesRead = runAndReturnBytesRead {
+ sc.textFile(tmpFilePath, 2).count()
+ }
+ assert(bytesRead >= tmpFile.length())
+ }
+
+ test("input metrics with interleaved reads") {
+ val numPartitions = 2
+ val cartVector = 0 to 9
+ val cartFile = new File(tmpDir, getClass.getSimpleName + "_cart.txt")
+ val cartFilePath = "file://" + cartFile.getAbsolutePath
+
+ // write files to disk so we can read them later.
+ sc.parallelize(cartVector).saveAsTextFile(cartFilePath)
+ val aRdd = sc.textFile(cartFilePath, numPartitions)
+
+ val tmpRdd = sc.textFile(tmpFilePath, numPartitions)
+
+ val firstSize= runAndReturnBytesRead {
+ aRdd.count()
+ }
+ val secondSize = runAndReturnBytesRead {
+ tmpRdd.count()
+ }
+
+ val cartesianBytes = runAndReturnBytesRead {
+ aRdd.cartesian(tmpRdd).count()
+ }
+
+ // Computing the amount of bytes read for a cartesian operation is a little involved.
+ // Cartesian interleaves reads between two partitions eg. p1 and p2.
+ // Here are the steps:
+ // 1) First it creates an iterator for p1
+ // 2) Creates an iterator for p2
+ // 3) Reads the first element of p1 and then all the elements of p2
+ // 4) proceeds to the next element of p1
+ // 5) Creates a new iterator for p2
+ // 6) rinse and repeat.
+ // As a result we read from the second partition n times where n is the number of keys in
+ // p1. Thus the math below for the test.
+ assert(cartesianBytes != 0)
+ assert(cartesianBytes == firstSize * numPartitions + (cartVector.length * secondSize))
+ }
+
+ private def runAndReturnBytesRead(job : => Unit): Long = {
val taskBytesRead = new ArrayBuffer[Long]()
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
}
})
- sc.textFile("file://" + file.getAbsolutePath, 2).count()
- // Wait for task end events to come in
+ job
+
sc.listenerBus.waitUntilEmpty(500)
- assert(taskBytesRead.length == 2)
- assert(taskBytesRead.sum >= file.length())
+ taskBytesRead.sum
}
test("output metrics when writing text file") {
diff --git a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
index 1b112f1a41ca9..cd193ae4f5238 100644
--- a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
@@ -76,6 +76,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices(0).mkString(",") === (0 to 32).mkString(","))
assert(slices(1).mkString(",") === (33 to 66).mkString(","))
assert(slices(2).mkString(",") === (67 to 100).mkString(","))
+ assert(slices(2).isInstanceOf[Range.Inclusive])
}
test("empty data") {
@@ -227,4 +228,28 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices.map(_.size).reduceLeft(_+_) === 100)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
+
+ test("inclusive ranges with Int.MaxValue and Int.MinValue") {
+ val data1 = 1 to Int.MaxValue
+ val slices1 = ParallelCollectionRDD.slice(data1, 3)
+ assert(slices1.size === 3)
+ assert(slices1.map(_.size).sum === Int.MaxValue)
+ assert(slices1(2).isInstanceOf[Range.Inclusive])
+ val data2 = -2 to Int.MinValue by -1
+ val slices2 = ParallelCollectionRDD.slice(data2, 3)
+ assert(slices2.size == 3)
+ assert(slices2.map(_.size).sum === Int.MaxValue)
+ assert(slices2(2).isInstanceOf[Range.Inclusive])
+ }
+
+ test("empty ranges with Int.MaxValue and Int.MinValue") {
+ val data1 = Int.MaxValue until Int.MaxValue
+ val slices1 = ParallelCollectionRDD.slice(data1, 5)
+ assert(slices1.size === 5)
+ for (i <- 0 until 5) assert(slices1(i).size === 0)
+ val data2 = Int.MaxValue until Int.MaxValue
+ val slices2 = ParallelCollectionRDD.slice(data2, 5)
+ assert(slices2.size === 5)
+ for (i <- 0 until 5) assert(slices2(i).size === 0)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 1de7e130039a5..437d8693c0b1f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -160,7 +160,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
*/
private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
val conf = getLoggingConf(testDirPath, compressionCodec)
- val sc = new SparkContext("local", "test", conf)
+ val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val expectedLogDir = testDir.toURI().toString()
@@ -184,6 +184,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
val eventSet = mutable.Set(
SparkListenerApplicationStart,
SparkListenerBlockManagerAdded,
+ SparkListenerExecutorAdded,
SparkListenerEnvironmentUpdate,
SparkListenerJobStart,
SparkListenerJobEnd,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 24f41bf8cccda..0fb1bdd30d975 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -34,6 +34,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
+ val jobCompletionTime = 1421191296660L
+
before {
sc = new SparkContext("local", "SparkListenerSuite")
}
@@ -44,7 +46,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
bus.addListener(counter)
// Listener bus hasn't started yet, so posting events should not increment counter
- (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
+ (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
assert(counter.count === 0)
// Starting listener bus should flush all buffered events
@@ -54,7 +56,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
// After listener bus has stopped, posting events should not increment counter
bus.stop()
- (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
+ (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
assert(counter.count === 5)
// Listener bus must not be started twice
@@ -99,7 +101,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
bus.addListener(blockingListener)
bus.start()
- bus.post(SparkListenerJobEnd(0, JobSucceeded))
+ bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
listenerStarted.acquire()
// Listener should be blocked after start
@@ -345,7 +347,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
bus.start()
// Post events to all listeners, and wait until the queue is drained
- (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
+ (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
// The exception should be caught, and the event should be propagated to other listeners
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
new file mode 100644
index 0000000000000..623a687c359a2
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.{SparkContext, LocalSparkContext}
+
+import org.scalatest.{FunSuite, BeforeAndAfter, BeforeAndAfterAll}
+
+import scala.collection.mutable
+
+/**
+ * Unit tests for SparkListener that require a local cluster.
+ */
+class SparkListenerWithClusterSuite extends FunSuite with LocalSparkContext
+ with BeforeAndAfter with BeforeAndAfterAll {
+
+ /** Length of time to wait while draining listener events. */
+ val WAIT_TIMEOUT_MILLIS = 10000
+
+ before {
+ sc = new SparkContext("local-cluster[2,1,512]", "SparkListenerSuite")
+ }
+
+ test("SparkListener sends executor added message") {
+ val listener = new SaveExecutorInfo
+ sc.addSparkListener(listener)
+
+ val rdd1 = sc.parallelize(1 to 100, 4)
+ val rdd2 = rdd1.map(_.toString)
+ rdd2.setName("Target RDD")
+ rdd2.count()
+
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(listener.addedExecutorInfo.size == 2)
+ assert(listener.addedExecutorInfo("0").totalCores == 1)
+ assert(listener.addedExecutorInfo("1").totalCores == 1)
+ }
+
+ private class SaveExecutorInfo extends SparkListener {
+ val addedExecutorInfo = mutable.Map[String, ExecutorInfo]()
+
+ override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
+ addedExecutorInfo(executor.executorId) = executor.executorInfo
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
index 48f5e40f506d9..78a30a40bf19a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
@@ -18,17 +18,20 @@
package org.apache.spark.scheduler.mesos
import org.scalatest.FunSuite
-import org.apache.spark.{scheduler, SparkConf, SparkContext, LocalSparkContext}
-import org.apache.spark.scheduler.{TaskDescription, WorkerOffer, TaskSchedulerImpl}
+import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
+import org.apache.spark.scheduler.{SparkListenerExecutorAdded, LiveListenerBus,
+ TaskDescription, WorkerOffer, TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend}
import org.apache.mesos.SchedulerDriver
-import org.apache.mesos.Protos._
-import org.scalatest.mock.EasyMockSugar
+import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, _}
import org.apache.mesos.Protos.Value.Scalar
import org.easymock.{Capture, EasyMock}
import java.nio.ByteBuffer
import java.util.Collections
import java.util
+import org.scalatest.mock.EasyMockSugar
+
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -52,11 +55,16 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
val driver = EasyMock.createMock(classOf[SchedulerDriver])
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
+ val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
+ listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2)))
+ EasyMock.replay(listenerBus)
+
val sc = EasyMock.createMock(classOf[SparkContext])
EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes()
EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes()
+ EasyMock.expect(sc.listenerBus).andReturn(listenerBus)
EasyMock.replay(sc)
val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 6389bd13cd85a..68074ae32a672 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -28,6 +28,8 @@ import org.apache.spark.util.Utils
class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matchers {
+ val jobSubmissionTime = 1421191042750L
+ val jobCompletionTime = 1421191296660L
private def createStageStartEvent(stageId: Int) = {
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
@@ -46,12 +48,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val stageInfos = stageIds.map { stageId =>
new StageInfo(stageId, 0, stageId.toString, 0, null, "")
}
- SparkListenerJobStart(jobId, stageInfos)
+ SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos)
}
private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
val result = if (failed) JobFailed(new Exception("dummy failure")) else JobSucceeded
- SparkListenerJobEnd(jobId, result)
+ SparkListenerJobEnd(jobId, jobCompletionTime, result)
}
private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean = false) {
@@ -227,15 +229,15 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
shuffleReadMetrics.incRemoteBytesRead(base + 1)
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
- taskMetrics.incExecutorRunTime(base + 4)
+ taskMetrics.setExecutorRunTime(base + 4)
taskMetrics.incDiskBytesSpilled(base + 5)
taskMetrics.incMemoryBytesSpilled(base + 6)
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
- taskMetrics.inputMetrics = Some(inputMetrics)
- inputMetrics.incBytesRead(base + 7)
+ taskMetrics.setInputMetrics(Some(inputMetrics))
+ inputMetrics.addBytesRead(base + 7)
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
taskMetrics.outputMetrics = Some(outputMetrics)
- outputMetrics.incBytesWritten(base + 8)
+ outputMetrics.setBytesWritten(base + 8)
taskMetrics
}
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 25693ee866487..0357fc6ce2780 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.util
import java.util.Properties
+import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.shuffle.MetadataFetchFailedException
import scala.collection.Map
@@ -33,6 +34,9 @@ import org.apache.spark.storage._
class JsonProtocolSuite extends FunSuite {
+ val jobSubmissionTime = 1421191042750L
+ val jobCompletionTime = 1421191296660L
+
test("SparkListenerEvent") {
val stageSubmitted =
SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
@@ -53,9 +57,9 @@ class JsonProtocolSuite extends FunSuite {
val stageIds = Seq[Int](1, 2, 3, 4)
val stageInfos = stageIds.map(x =>
makeStageInfo(x, x * 200, x * 300, x * 400L, x * 500L))
- SparkListenerJobStart(10, stageInfos, properties)
+ SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
}
- val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
+ val jobEnd = SparkListenerJobEnd(20, jobCompletionTime, JobSucceeded)
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
"JVM Information" -> Seq(("GC speed", "9999 objects/s"), ("Java home", "Land of coffee")),
"Spark Properties" -> Seq(("Job throughput", "80000 jobs/s, regardless of job type")),
@@ -69,6 +73,9 @@ class JsonProtocolSuite extends FunSuite {
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
val applicationEnd = SparkListenerApplicationEnd(42L)
+ val executorAdded = SparkListenerExecutorAdded("exec1",
+ new ExecutorInfo("Hostee.awesome.com", 11))
+ val executorRemoved = SparkListenerExecutorRemoved("exec2")
testEvent(stageSubmitted, stageSubmittedJsonString)
testEvent(stageCompleted, stageCompletedJsonString)
@@ -85,6 +92,8 @@ class JsonProtocolSuite extends FunSuite {
testEvent(unpersistRdd, unpersistRDDJsonString)
testEvent(applicationStart, applicationStartJsonString)
testEvent(applicationEnd, applicationEndJsonString)
+ testEvent(executorAdded, executorAddedJsonString)
+ testEvent(executorRemoved, executorRemovedJsonString)
}
test("Dependent Classes") {
@@ -94,6 +103,7 @@ class JsonProtocolSuite extends FunSuite {
testTaskMetrics(makeTaskMetrics(
33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
+ testExecutorInfo(new ExecutorInfo("host", 43))
// StorageLevel
testStorageLevel(StorageLevel.NONE)
@@ -240,13 +250,31 @@ class JsonProtocolSuite extends FunSuite {
val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400, x * 500))
val dummyStageInfos =
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
- val jobStart = SparkListenerJobStart(10, stageInfos, properties)
+ val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == "Stage Infos"})
val expectedJobStart =
- SparkListenerJobStart(10, dummyStageInfos, properties)
+ SparkListenerJobStart(10, jobSubmissionTime, dummyStageInfos, properties)
assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldEvent))
}
+ test("SparkListenerJobStart and SparkListenerJobEnd backward compatibility") {
+ // Prior to Spark 1.3.0, SparkListenerJobStart did not have a "Submission Time" property.
+ // Also, SparkListenerJobEnd did not have a "Completion Time" property.
+ val stageIds = Seq[Int](1, 2, 3, 4)
+ val stageInfos = stageIds.map(x => makeStageInfo(x * 10, x * 20, x * 30, x * 40, x * 50))
+ val jobStart = SparkListenerJobStart(11, jobSubmissionTime, stageInfos, properties)
+ val oldStartEvent = JsonProtocol.jobStartToJson(jobStart)
+ .removeField({ _._1 == "Submission Time"})
+ val expectedJobStart = SparkListenerJobStart(11, -1, stageInfos, properties)
+ assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldStartEvent))
+
+ val jobEnd = SparkListenerJobEnd(11, jobCompletionTime, JobSucceeded)
+ val oldEndEvent = JsonProtocol.jobEndToJson(jobEnd)
+ .removeField({ _._1 == "Completion Time"})
+ val expectedJobEnd = SparkListenerJobEnd(11, -1, JobSucceeded)
+ assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent))
+ }
+
/** -------------------------- *
| Helper test running methods |
* --------------------------- */
@@ -303,6 +331,10 @@ class JsonProtocolSuite extends FunSuite {
assert(blockId === newBlockId)
}
+ private def testExecutorInfo(info: ExecutorInfo) {
+ val newInfo = JsonProtocol.executorInfoFromJson(JsonProtocol.executorInfoToJson(info))
+ assertEquals(info, newInfo)
+ }
/** -------------------------------- *
| Util methods for comparing events |
@@ -335,6 +367,11 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(e1.jobResult, e2.jobResult)
case (e1: SparkListenerEnvironmentUpdate, e2: SparkListenerEnvironmentUpdate) =>
assertEquals(e1.environmentDetails, e2.environmentDetails)
+ case (e1: SparkListenerExecutorAdded, e2: SparkListenerExecutorAdded) =>
+ assert(e1.executorId == e1.executorId)
+ assertEquals(e1.executorInfo, e2.executorInfo)
+ case (e1: SparkListenerExecutorRemoved, e2: SparkListenerExecutorRemoved) =>
+ assert(e1.executorId == e1.executorId)
case (e1, e2) =>
assert(e1 === e2)
case _ => fail("Events don't match in types!")
@@ -387,6 +424,11 @@ class JsonProtocolSuite extends FunSuite {
assert(info1.accumulables === info2.accumulables)
}
+ private def assertEquals(info1: ExecutorInfo, info2: ExecutorInfo) {
+ assert(info1.executorHost == info2.executorHost)
+ assert(info1.totalCores == info2.totalCores)
+ }
+
private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
assert(metrics1.hostname === metrics2.hostname)
assert(metrics1.executorDeserializeTime === metrics2.executorDeserializeTime)
@@ -600,17 +642,17 @@ class JsonProtocolSuite extends FunSuite {
hasOutput: Boolean) = {
val t = new TaskMetrics
t.setHostname("localhost")
- t.incExecutorDeserializeTime(a)
- t.incExecutorRunTime(b)
- t.incResultSize(c)
- t.incJvmGCTime(d)
- t.incResultSerializationTime(a + b)
+ t.setExecutorDeserializeTime(a)
+ t.setExecutorRunTime(b)
+ t.setResultSize(c)
+ t.setJvmGCTime(d)
+ t.setResultSerializationTime(a + b)
t.incMemoryBytesSpilled(a + c)
if (hasHadoopInput) {
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
- inputMetrics.incBytesRead(d + e + f)
- t.inputMetrics = Some(inputMetrics)
+ inputMetrics.addBytesRead(d + e + f)
+ t.setInputMetrics(Some(inputMetrics))
} else {
val sr = new ShuffleReadMetrics
sr.incRemoteBytesRead(b + d)
@@ -621,7 +663,7 @@ class JsonProtocolSuite extends FunSuite {
}
if (hasOutput) {
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
- outputMetrics.incBytesWritten(a + b + c)
+ outputMetrics.setBytesWritten(a + b + c)
t.outputMetrics = Some(outputMetrics)
} else {
val sw = new ShuffleWriteMetrics
@@ -1054,6 +1096,7 @@ class JsonProtocolSuite extends FunSuite {
|{
| "Event": "SparkListenerJobStart",
| "Job ID": 10,
+ | "Submission Time": 1421191042750,
| "Stage Infos": [
| {
| "Stage ID": 1,
@@ -1328,6 +1371,7 @@ class JsonProtocolSuite extends FunSuite {
|{
| "Event": "SparkListenerJobEnd",
| "Job ID": 20,
+ | "Completion Time": 1421191296660,
| "Job Result": {
| "Result": "JobSucceeded"
| }
@@ -1407,4 +1451,24 @@ class JsonProtocolSuite extends FunSuite {
| "Timestamp": 42
|}
"""
+
+ private val executorAddedJsonString =
+ """
+ |{
+ | "Event": "SparkListenerExecutorAdded",
+ | "Executor ID": "exec1",
+ | "Executor Info": {
+ | "Host": "Hostee.awesome.com",
+ | "Total Cores": 11
+ | }
+ |}
+ """
+
+ private val executorRemovedJsonString =
+ """
+ |{
+ | "Event": "SparkListenerExecutorRemoved",
+ | "Executor ID": "exec2"
+ |}
+ """
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 673cdb371a512..efbab4085317a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -102,11 +102,10 @@ of the most common options to set are:
- spark.executor.memory |
- 512m |
+ spark.driver.cores |
+ 1 |
- Amount of memory to use per executor process, in the same format as JVM memory strings
- (e.g. 512m , 2g ).
+ Number of cores to use for the driver process, only in cluster mode.
|
@@ -117,6 +116,14 @@ of the most common options to set are:
(e.g. 512m
, 2g
).
+
+ spark.executor.memory |
+ 512m |
+
+ Amount of memory to use per executor process, in the same format as JVM memory strings
+ (e.g. 512m , 2g ).
+ |
+
spark.driver.maxResultSize |
1g |
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 5e0d5c15d7069..0211bbabc1132 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -913,7 +913,7 @@ for details.
cogroup(otherDataset, [numTasks]) |
- When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable<V>, Iterable<W>) tuples. This operation is also called groupWith . |
+ When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith . |
cartesian(otherDataset) |
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4f273098c5db3..68ab127bcf087 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -29,6 +29,23 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
In cluster mode, use spark.driver.memory
instead.
+
+ spark.driver.cores |
+ 1 |
+
+ Number of cores used by the driver in YARN cluster mode.
+ Since the driver is run in the same JVM as the YARN Application Master in cluster mode, this also controls the cores used by the YARN AM.
+ In client mode, use spark.yarn.am.cores to control the number of cores used by the YARN AM instead.
+ |
+
+
+ spark.yarn.am.cores |
+ 1 |
+
+ Number of cores to use for the YARN Application Master in client mode.
+ In cluster mode, use spark.driver.cores instead.
+ |
+
spark.yarn.am.waitTime |
100000 |
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 032106371cd60..d4eeccf64275f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -127,6 +127,7 @@ private[spark] class Client(
}
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead)
+ capability.setVirtualCores(args.amCores)
appContext.setResource(capability)
appContext
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 461a9ccd3c216..79bead77ba6e4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -36,14 +36,18 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var numExecutors = DEFAULT_NUMBER_EXECUTORS
var amQueue = sparkConf.get("spark.yarn.queue", "default")
var amMemory: Int = 512 // MB
+ var amCores: Int = 1
var appName: String = "Spark"
var priority = 0
def isClusterMode: Boolean = userClass != null
private var driverMemory: Int = 512 // MB
+ private var driverCores: Int = 1
private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead"
private val amMemKey = "spark.yarn.am.memory"
private val amMemOverheadKey = "spark.yarn.am.memoryOverhead"
+ private val driverCoresKey = "spark.driver.cores"
+ private val amCoresKey = "spark.yarn.am.cores"
private val isDynamicAllocationEnabled =
sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
@@ -92,19 +96,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
"You must specify at least 1 executor!\n" + getUsageMessage())
}
if (isClusterMode) {
- for (key <- Seq(amMemKey, amMemOverheadKey)) {
+ for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) {
if (sparkConf.contains(key)) {
println(s"$key is set but does not apply in cluster mode.")
}
}
amMemory = driverMemory
+ amCores = driverCores
} else {
- if (sparkConf.contains(driverMemOverheadKey)) {
- println(s"$driverMemOverheadKey is set but does not apply in client mode.")
+ for (key <- Seq(driverMemOverheadKey, driverCoresKey)) {
+ if (sparkConf.contains(key)) {
+ println(s"$key is set but does not apply in client mode.")
+ }
}
sparkConf.getOption(amMemKey)
.map(Utils.memoryStringToMb)
.foreach { mem => amMemory = mem }
+ sparkConf.getOption(amCoresKey)
+ .map(_.toInt)
+ .foreach { cores => amCores = cores }
}
}
@@ -140,6 +150,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
driverMemory = value
args = tail
+ case ("--driver-cores") :: IntParam(value) :: tail =>
+ driverCores = value
+ args = tail
+
case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
if (args(0) == "--num-workers") {
println("--num-workers is deprecated. Use --num-executors instead.")
@@ -198,7 +212,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
private def getUsageMessage(unknownParam: List[String] = null): String = {
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
- message + """
+ message +
+ """
|Usage: org.apache.spark.deploy.yarn.Client [options]
|Options:
| --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster
@@ -209,6 +224,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
| --num-executors NUM Number of executors to start (Default: 2)
| --executor-cores NUM Number of cores for the executors (Default: 1).
| --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)
+ | --driver-cores NUM Number of cores used by the driver (Default: 1).
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)
| --name NAME The name of your application (Default: Spark)
| --queue QUEUE The hadoop queue to use for allocation requests (Default: