Skip to content

Commit

Permalink
[SPARK-26260][CORE] For disk store tasks summary table should show on…
Browse files Browse the repository at this point in the history
…ly successful tasks summary

…sks metrics for disk store

### What changes were proposed in this pull request?

After #23088 task Summary table in the stage page shows successful tasks metrics for lnMemory store. In this PR, it added for disk store also.

### Why are the changes needed?

Now both InMemory and disk store will be consistent in showing the task summary table in the UI, if there are non successful tasks

### Does this PR introduce any user-facing change?

no
### How was this patch tested?

Added UT. Manually verified

Test steps:
1. add the config in spark-defaults.conf -> **spark.history.store.path /tmp/store**
2. sbin/start-hitoryserver
3. bin/spark-shell
4. `sc.parallelize(1 to 1000, 2).map(x => throw new Exception("fail")).count`

![Screenshot 2019-11-14 at 3 51 39 AM](https://user-images.githubusercontent.com/23054875/68809546-268d2e80-0692-11ea-8b2c-bee767478135.png)

Closes #26508 from shahidki31/task.

Authored-by: shahid <shahidki31@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
shahidki31 authored and Marcelo Vanzin committed Nov 25, 2019
1 parent 29ebd93 commit bec2068
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 138 deletions.
82 changes: 23 additions & 59 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,6 @@ private[spark] class AppStatusStore(
store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality
}

// SPARK-26119: we only want to consider successful tasks when calculating the metrics summary,
// but currently this is very expensive when using a disk store. So we only trigger the slower
// code path when we know we have all data in memory. The following method checks whether all
// the data will be in memory.
private def isInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] || listener.isDefined

/**
* Calculates a summary of the task metrics for the given stage attempt, returning the
* requested quantiles for the recorded metrics.
Expand All @@ -162,21 +156,11 @@ private[spark] class AppStatusStore(
// cheaper for disk stores (avoids deserialization).
val count = {
Utils.tryWithResource(
if (isInMemoryStore) {
// For Live UI, we should count the tasks with status "SUCCESS" only.
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.STATUS)
.first("SUCCESS")
.last("SUCCESS")
.closeableIterator()
} else {
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.EXEC_RUN_TIME)
.first(0L)
.closeableIterator()
}
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.EXEC_RUN_TIME)
.first(0L)
.closeableIterator()
) { it =>
var _count = 0L
while (it.hasNext()) {
Expand Down Expand Up @@ -245,50 +229,30 @@ private[spark] class AppStatusStore(
// stabilize once the stage finishes. It's also slow, especially with disk stores.
val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }

// TODO: Summary metrics needs to display all the successful tasks' metrics (SPARK-26119).
// For InMemory case, it is efficient to find using the following code. But for diskStore case
// we need an efficient solution to avoid deserialization time overhead. For that, we need to
// rework on the way indexing works, so that we can index by specific metrics for successful
// and failed tasks differently (would be tricky). Also would require changing the disk store
// version (to invalidate old stores).
def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = {
if (isInMemoryStore) {
val quantileTasks = store.view(classOf[TaskDataWrapper])
Utils.tryWithResource(
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(index)
.first(0L)
.asScala
.filter { _.status == "SUCCESS"} // Filter "SUCCESS" tasks
.toIndexedSeq

indices.map { index =>
fn(quantileTasks(index.toInt)).toDouble
}.toIndexedSeq
} else {
Utils.tryWithResource(
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(index)
.first(0L)
.closeableIterator()
) { it =>
var last = Double.NaN
var currentIdx = -1L
indices.map { idx =>
if (idx == currentIdx) {
.closeableIterator()
) { it =>
var last = Double.NaN
var currentIdx = -1L
indices.map { idx =>
if (idx == currentIdx) {
last
} else {
val diff = idx - currentIdx
currentIdx = idx
if (it.skip(diff - 1)) {
last = fn(it.next()).toDouble
last
} else {
val diff = idx - currentIdx
currentIdx = idx
if (it.skip(diff - 1)) {
last = fn(it.next()).toDouble
last
} else {
Double.NaN
}
Double.NaN
}
}.toIndexedSeq
}
}
}.toIndexedSeq
}
}

Expand Down Expand Up @@ -582,7 +546,7 @@ private[spark] class AppStatusStore(

private[spark] object AppStatusStore {

val CURRENT_VERSION = 1L
val CURRENT_VERSION = 2L

/**
* Create an in-memory store for a live application.
Expand Down
102 changes: 78 additions & 24 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,19 @@ private class LiveTask(
info.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis()))
}

val hasMetrics = metrics.executorDeserializeTime >= 0

/**
* SPARK-26260: For non successful tasks, store the metrics as negative to avoid
* the calculation in the task summary. `toApi` method in the `TaskDataWrapper` will make
* it actual value.
*/
val taskMetrics: v1.TaskMetrics = if (hasMetrics && !info.successful) {
makeNegative(metrics)
} else {
metrics
}

new TaskDataWrapper(
info.taskId,
info.index,
Expand All @@ -199,30 +212,31 @@ private class LiveTask(
newAccumulatorInfos(info.accumulables),
errorMessage,

metrics.executorDeserializeTime,
metrics.executorDeserializeCpuTime,
metrics.executorRunTime,
metrics.executorCpuTime,
metrics.resultSize,
metrics.jvmGcTime,
metrics.resultSerializationTime,
metrics.memoryBytesSpilled,
metrics.diskBytesSpilled,
metrics.peakExecutionMemory,
metrics.inputMetrics.bytesRead,
metrics.inputMetrics.recordsRead,
metrics.outputMetrics.bytesWritten,
metrics.outputMetrics.recordsWritten,
metrics.shuffleReadMetrics.remoteBlocksFetched,
metrics.shuffleReadMetrics.localBlocksFetched,
metrics.shuffleReadMetrics.fetchWaitTime,
metrics.shuffleReadMetrics.remoteBytesRead,
metrics.shuffleReadMetrics.remoteBytesReadToDisk,
metrics.shuffleReadMetrics.localBytesRead,
metrics.shuffleReadMetrics.recordsRead,
metrics.shuffleWriteMetrics.bytesWritten,
metrics.shuffleWriteMetrics.writeTime,
metrics.shuffleWriteMetrics.recordsWritten,
hasMetrics,
taskMetrics.executorDeserializeTime,
taskMetrics.executorDeserializeCpuTime,
taskMetrics.executorRunTime,
taskMetrics.executorCpuTime,
taskMetrics.resultSize,
taskMetrics.jvmGcTime,
taskMetrics.resultSerializationTime,
taskMetrics.memoryBytesSpilled,
taskMetrics.diskBytesSpilled,
taskMetrics.peakExecutionMemory,
taskMetrics.inputMetrics.bytesRead,
taskMetrics.inputMetrics.recordsRead,
taskMetrics.outputMetrics.bytesWritten,
taskMetrics.outputMetrics.recordsWritten,
taskMetrics.shuffleReadMetrics.remoteBlocksFetched,
taskMetrics.shuffleReadMetrics.localBlocksFetched,
taskMetrics.shuffleReadMetrics.fetchWaitTime,
taskMetrics.shuffleReadMetrics.remoteBytesRead,
taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk,
taskMetrics.shuffleReadMetrics.localBytesRead,
taskMetrics.shuffleReadMetrics.recordsRead,
taskMetrics.shuffleWriteMetrics.bytesWritten,
taskMetrics.shuffleWriteMetrics.writeTime,
taskMetrics.shuffleWriteMetrics.recordsWritten,

stageId,
stageAttemptId)
Expand Down Expand Up @@ -710,6 +724,46 @@ private object LiveEntityHelpers {
addMetrics(m1, m2, -1)
}

/**
* Convert all the metric values to negative as well as handle zero values.
* This method assumes that all the metric values are greater than or equal to zero
*/
def makeNegative(m: v1.TaskMetrics): v1.TaskMetrics = {
// To handle 0 metric value, add 1 and make the metric negative.
// To recover actual value do `math.abs(metric + 1)`
// Eg: if the metric values are (5, 3, 0, 1) => Updated metric values will be (-6, -4, -1, -2)
// To get actual metric value, do math.abs(metric + 1) => (5, 3, 0, 1)
def updateMetricValue(metric: Long): Long = {
metric * -1L - 1L
}

createMetrics(
updateMetricValue(m.executorDeserializeTime),
updateMetricValue(m.executorDeserializeCpuTime),
updateMetricValue(m.executorRunTime),
updateMetricValue(m.executorCpuTime),
updateMetricValue(m.resultSize),
updateMetricValue(m.jvmGcTime),
updateMetricValue(m.resultSerializationTime),
updateMetricValue(m.memoryBytesSpilled),
updateMetricValue(m.diskBytesSpilled),
updateMetricValue(m.peakExecutionMemory),
updateMetricValue(m.inputMetrics.bytesRead),
updateMetricValue(m.inputMetrics.recordsRead),
updateMetricValue(m.outputMetrics.bytesWritten),
updateMetricValue(m.outputMetrics.recordsWritten),
updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched),
updateMetricValue(m.shuffleReadMetrics.localBlocksFetched),
updateMetricValue(m.shuffleReadMetrics.fetchWaitTime),
updateMetricValue(m.shuffleReadMetrics.remoteBytesRead),
updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk),
updateMetricValue(m.shuffleReadMetrics.localBytesRead),
updateMetricValue(m.shuffleReadMetrics.recordsRead),
updateMetricValue(m.shuffleWriteMetrics.bytesWritten),
updateMetricValue(m.shuffleWriteMetrics.writeTime),
updateMetricValue(m.shuffleWriteMetrics.recordsWritten))
}

private def addMetrics(m1: v1.TaskMetrics, m2: v1.TaskMetrics, mult: Int): v1.TaskMetrics = {
createMetrics(
m1.executorDeserializeTime + m2.executorDeserializeTime * mult,
Expand Down
76 changes: 44 additions & 32 deletions core/src/main/scala/org/apache/spark/status/storeTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,13 @@ private[spark] class TaskDataWrapper(
val accumulatorUpdates: Seq[AccumulableInfo],
val errorMessage: Option[String],

val hasMetrics: Boolean,
// The following is an exploded view of a TaskMetrics API object. This saves 5 objects
// (= 80 bytes of Java object overhead) per instance of this wrapper. If the first value
// (executorDeserializeTime) is -1L, it means the metrics for this task have not been
// recorded.
// (= 80 bytes of Java object overhead) per instance of this wrapper. Non successful
// tasks' metrics will have negative values in `TaskDataWrapper`. `TaskData` will have
// actual metric values. To recover the actual metric value from `TaskDataWrapper`,
// need use `getMetricValue` method. If `hasMetrics` is false, it means the metrics
// for this task have not been recorded.
@KVIndexParam(value = TaskIndexNames.DESER_TIME, parent = TaskIndexNames.STAGE)
val executorDeserializeTime: Long,
@KVIndexParam(value = TaskIndexNames.DESER_CPU_TIME, parent = TaskIndexNames.STAGE)
Expand Down Expand Up @@ -233,39 +236,46 @@ private[spark] class TaskDataWrapper(
val stageId: Int,
val stageAttemptId: Int) {

def hasMetrics: Boolean = executorDeserializeTime >= 0
// SPARK-26260: To handle non successful tasks metrics (Running, Failed, Killed).
private def getMetricValue(metric: Long): Long = {
if (status != "SUCCESS") {
math.abs(metric + 1)
} else {
metric
}
}

def toApi: TaskData = {
val metrics = if (hasMetrics) {
Some(new TaskMetrics(
executorDeserializeTime,
executorDeserializeCpuTime,
executorRunTime,
executorCpuTime,
resultSize,
jvmGcTime,
resultSerializationTime,
memoryBytesSpilled,
diskBytesSpilled,
peakExecutionMemory,
getMetricValue(executorDeserializeTime),
getMetricValue(executorDeserializeCpuTime),
getMetricValue(executorRunTime),
getMetricValue(executorCpuTime),
getMetricValue(resultSize),
getMetricValue(jvmGcTime),
getMetricValue(resultSerializationTime),
getMetricValue(memoryBytesSpilled),
getMetricValue(diskBytesSpilled),
getMetricValue(peakExecutionMemory),
new InputMetrics(
inputBytesRead,
inputRecordsRead),
getMetricValue(inputBytesRead),
getMetricValue(inputRecordsRead)),
new OutputMetrics(
outputBytesWritten,
outputRecordsWritten),
getMetricValue(outputBytesWritten),
getMetricValue(outputRecordsWritten)),
new ShuffleReadMetrics(
shuffleRemoteBlocksFetched,
shuffleLocalBlocksFetched,
shuffleFetchWaitTime,
shuffleRemoteBytesRead,
shuffleRemoteBytesReadToDisk,
shuffleLocalBytesRead,
shuffleRecordsRead),
getMetricValue(shuffleRemoteBlocksFetched),
getMetricValue(shuffleLocalBlocksFetched),
getMetricValue(shuffleFetchWaitTime),
getMetricValue(shuffleRemoteBytesRead),
getMetricValue(shuffleRemoteBytesReadToDisk),
getMetricValue(shuffleLocalBytesRead),
getMetricValue(shuffleRecordsRead)),
new ShuffleWriteMetrics(
shuffleBytesWritten,
shuffleWriteTime,
shuffleRecordsWritten)))
getMetricValue(shuffleBytesWritten),
getMetricValue(shuffleWriteTime),
getMetricValue(shuffleRecordsWritten))))
} else {
None
}
Expand Down Expand Up @@ -296,8 +306,10 @@ private[spark] class TaskDataWrapper(
@JsonIgnore @KVIndex(value = TaskIndexNames.SCHEDULER_DELAY, parent = TaskIndexNames.STAGE)
def schedulerDelay: Long = {
if (hasMetrics) {
AppStatusUtils.schedulerDelay(launchTime, resultFetchStart, duration, executorDeserializeTime,
resultSerializationTime, executorRunTime)
AppStatusUtils.schedulerDelay(launchTime, resultFetchStart, duration,
getMetricValue(executorDeserializeTime),
getMetricValue(resultSerializationTime),
getMetricValue(executorRunTime))
} else {
-1L
}
Expand Down Expand Up @@ -330,7 +342,7 @@ private[spark] class TaskDataWrapper(
@JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_READS, parent = TaskIndexNames.STAGE)
private def shuffleTotalReads: Long = {
if (hasMetrics) {
shuffleLocalBytesRead + shuffleRemoteBytesRead
getMetricValue(shuffleLocalBytesRead) + getMetricValue(shuffleRemoteBytesRead)
} else {
-1L
}
Expand All @@ -339,7 +351,7 @@ private[spark] class TaskDataWrapper(
@JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_BLOCKS, parent = TaskIndexNames.STAGE)
private def shuffleTotalBlocks: Long = {
if (hasMetrics) {
shuffleLocalBlocksFetched + shuffleRemoteBlocksFetched
getMetricValue(shuffleLocalBlocksFetched) + getMetricValue(shuffleRemoteBlocksFetched)
} else {
-1L
}
Expand Down
Loading

0 comments on commit bec2068

Please sign in to comment.