Skip to content

Commit

Permalink
add metric for sort time of spill data (apache#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
yma11 authored Nov 17, 2020
1 parent 022418a commit 915d7c4
Show file tree
Hide file tree
Showing 15 changed files with 68 additions and 5 deletions.
14 changes: 14 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/stagepage.js
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,20 @@ $(document).ready(function () {
},
name: "Spill Write Time"
},
{
data : function (row, type) {
if (row.taskMetrics && row.taskMetrics.diskBytesSpilled > 0) {
if (type === 'display') {
return formatDuration(parseInt(row.taskMetrics.spillSortTime) / 1000000);
} else {
return row.taskMetrics.spillSortTime;
}
} else {
return "";
}
},
name: "Spill Sort Time"
},
{
data : function (row, type) {
if (row.taskMetrics && row.taskMetrics.diskBytesSpilled > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ <h4 id="tasksTitle" class="title-table"></h4>
<th>Spill (Memory)</th>
<th>Spill (Disk)</th>
<th>Spill Write Time</th>
<th>Spill Sort Time</th>
<th>Spill Read Time</th>
<th>Spill Delete Time</th>
<th>Errors</th>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ private[spark] object InternalAccumulator {
val SHUFFLE_SPILL_WRITE_TIME = METRICS_PREFIX + "shuffleSpillWriteTime"
val SHUFFLE_SPILL_READ_TIME = METRICS_PREFIX + "shuffleSpillReadTime"
val SHUFFLE_SPILL_DELETE_TIME = METRICS_PREFIX + "shuffleSpillDeleteTime"
val SPILL_SORT_TIME = METRICS_PREFIX + "spillSortTime"
val PEAK_EXECUTION_MEMORY = METRICS_PREFIX + "peakExecutionMemory"
val UPDATED_BLOCK_STATUSES = METRICS_PREFIX + "updatedBlockStatuses"
val TEST_ACCUM = METRICS_PREFIX + "testAccumulator"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class TaskMetrics private[spark] () extends Serializable {
private val _shuffleSpillWriteTime = new LongAccumulator
private val _shuffleSpillReadTime = new LongAccumulator
private val _shuffleSpillDeleteTime = new LongAccumulator
private val _spillSortTime = new LongAccumulator
private val _peakExecutionMemory = new LongAccumulator
private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)]

Expand Down Expand Up @@ -108,7 +109,7 @@ class TaskMetrics private[spark] () extends Serializable {
def shuffleSpillWriteTime: Long = _shuffleSpillWriteTime.sum
def shuffleSpillReadTime: Long = _shuffleSpillReadTime.sum
def shuffleSpillDeleteTime: Long = _shuffleSpillDeleteTime.sum

def spillSortTime: Long = _spillSortTime.sum
/**
* Peak memory used by internal data structures created during shuffles, aggregations and
* joins. The value of this accumulator should be approximately the sum of the peak sizes
Expand Down Expand Up @@ -149,6 +150,7 @@ class TaskMetrics private[spark] () extends Serializable {
private[spark] def incShuffleSpillWriteTime(v: Long): Unit = _shuffleSpillWriteTime.add(v)
private[spark] def incShuffleSpillReadTime(v: Long): Unit = _shuffleSpillReadTime.add(v)
private[spark] def incShuffleSpillDeleteTime(v: Long): Unit = _shuffleSpillDeleteTime.add(v)
private[spark] def incSpillSortTime(v: Long): Unit = _spillSortTime.add(v)
private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v)
private[spark] def incUpdatedBlockStatuses(v: (BlockId, BlockStatus)): Unit =
_updatedBlockStatuses.add(v)
Expand Down Expand Up @@ -228,6 +230,7 @@ class TaskMetrics private[spark] () extends Serializable {
MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
DISK_BYTES_SPILLED -> _diskBytesSpilled,
SHUFFLE_SPILL_WRITE_TIME -> _shuffleSpillWriteTime,
SPILL_SORT_TIME -> _spillSortTime,
SHUFFLE_SPILL_READ_TIME -> _shuffleSpillReadTime,
SHUFFLE_SPILL_DELETE_TIME -> _shuffleSpillDeleteTime,
PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ private[spark] class AppStatusStore(
diskBytesSpilled = toValues(_.diskBytesSpilled),

shuffleSpillWriteTime = toValues(_.shuffleSpillWriteTime),
spillSortTime = toValues(_.spillSortTime),
shuffleSpillReadTime = toValues(_.shuffleSpillReadTime),
shuffleSpillDeleteTime = toValues(_.shuffleSpillDeleteTime),
inputMetrics = new v1.InputMetricDistributions(
Expand Down Expand Up @@ -291,10 +292,13 @@ private[spark] class AppStatusStore(
diskBytesSpilled = scanTasks(TaskIndexNames.DISK_SPILL) { t => t.diskBytesSpilled },
shuffleSpillWriteTime =
scanTasks(TaskIndexNames.SHUFFLE_SPILL_WRITE_TIME) { t => t.shuffleSpillWriteTime },
spillSortTime =
scanTasks(TaskIndexNames.SPILL_SORT_TIME) { t => t.spillSortTime },
shuffleSpillReadTime =
scanTasks(TaskIndexNames.SHUFFLE_SPILL_READ_TIME) { t => t.shuffleSpillReadTime },
shuffleSpillDeleteTime =
scanTasks(TaskIndexNames.SHUFFLE_SPILL_DELETE_TIME) { t => t.shuffleSpillDeleteTime },

inputMetrics = new v1.InputMetricDistributions(
scanTasks(TaskIndexNames.INPUT_SIZE) { t => t.inputBytesRead },
scanTasks(TaskIndexNames.INPUT_RECORDS) { t => t.inputRecordsRead }),
Expand Down Expand Up @@ -339,9 +343,11 @@ private[spark] class AppStatusStore(
memoryBytesSpilled = computedQuantiles.memoryBytesSpilled(idx),
diskBytesSpilled = computedQuantiles.diskBytesSpilled(idx),
shuffleSpillWriteTime = computedQuantiles.shuffleSpillWriteTime(idx),
spillSortTime = computedQuantiles.spillSortTime(idx),
shuffleSpillReadTime = computedQuantiles.shuffleSpillReadTime(idx),
shuffleSpillDeleteTime = computedQuantiles.shuffleSpillDeleteTime(idx),


bytesRead = computedQuantiles.inputMetrics.bytesRead(idx),
recordsRead = computedQuantiles.inputMetrics.recordsRead(idx),

Expand Down Expand Up @@ -476,6 +482,7 @@ private[spark] class AppStatusStore(
memoryBytesSpilled = stage.memoryBytesSpilled,
diskBytesSpilled = stage.diskBytesSpilled,
shuffleSpillWriteTime = stage.shuffleSpillWriteTime,
spillSortTime = stage.spillSortTime,
shuffleSpillReadTime = stage.shuffleSpillReadTime,
shuffleSpillDeleteTime = stage.shuffleSpillDeleteTime,
peakExecutionMemory = stage.peakExecutionMemory,
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ private class LiveTask(
metrics.memoryBytesSpilled,
metrics.diskBytesSpilled,
metrics.shuffleSpillWriteTime,
metrics.spillSortTime,
metrics.shuffleSpillReadTime,
metrics.shuffleSpillDeleteTime,
metrics.peakExecutionMemory,
Expand Down Expand Up @@ -227,6 +228,7 @@ private class LiveTask(
taskMetrics.memoryBytesSpilled,
taskMetrics.diskBytesSpilled,
taskMetrics.shuffleSpillWriteTime,
taskMetrics.spillSortTime,
taskMetrics.shuffleSpillReadTime,
taskMetrics.shuffleSpillDeleteTime,
taskMetrics.peakExecutionMemory,
Expand Down Expand Up @@ -370,6 +372,7 @@ private class LiveExecutorStageSummary(
metrics.memoryBytesSpilled,
metrics.diskBytesSpilled,
metrics.shuffleSpillWriteTime,
metrics.spillSortTime,
metrics.shuffleSpillReadTime,
metrics.shuffleSpillDeleteTime,
isBlacklisted)
Expand Down Expand Up @@ -449,6 +452,7 @@ private class LiveStage extends LiveEntity {
diskBytesSpilled = metrics.diskBytesSpilled,

shuffleSpillWriteTime = metrics.shuffleSpillWriteTime,
spillSortTime = metrics.spillSortTime,
shuffleSpillReadTime = metrics.shuffleSpillReadTime,
shuffleSpillDeleteTime = metrics.shuffleSpillDeleteTime,

Expand Down Expand Up @@ -691,6 +695,7 @@ private[spark] object LiveEntityHelpers {
memoryBytesSpilled: Long,
diskBytesSpilled: Long,
shuffleSpillWriteTime: Long,
spillSortTime: Long,
shuffleSpillReadTime: Long,
shuffleSpillDeleteTime: Long,
peakExecutionMemory: Long,
Expand Down Expand Up @@ -719,6 +724,7 @@ private[spark] object LiveEntityHelpers {
memoryBytesSpilled,
diskBytesSpilled,
shuffleSpillWriteTime,
spillSortTime,
shuffleSpillReadTime,
shuffleSpillDeleteTime,
peakExecutionMemory,
Expand All @@ -745,7 +751,7 @@ private[spark] object LiveEntityHelpers {

def createMetrics(default: Long): v1.TaskMetrics = {
createMetrics(default, default, default, default, default, default, default, default,
default, default, default, default, default, default, default, default,
default, default, default, default, default, default, default, default, default,
default, default, default, default, default, default, default, default,
default, default, default)
}
Expand Down Expand Up @@ -782,6 +788,7 @@ private[spark] object LiveEntityHelpers {
updateMetricValue(m.memoryBytesSpilled),
updateMetricValue(m.diskBytesSpilled),
updateMetricValue(m.shuffleSpillWriteTime),
updateMetricValue(m.spillSortTime),
updateMetricValue(m.shuffleSpillReadTime),
updateMetricValue(m.shuffleSpillDeleteTime),
updateMetricValue(m.peakExecutionMemory),
Expand Down Expand Up @@ -813,6 +820,7 @@ private[spark] object LiveEntityHelpers {
m1.memoryBytesSpilled + m2.memoryBytesSpilled * mult,
m1.diskBytesSpilled + m2.diskBytesSpilled * mult,
m1.shuffleSpillWriteTime + m2.shuffleSpillWriteTime * mult,
m1.spillSortTime + m2.spillSortTime * mult,
m1.shuffleSpillReadTime + m2.shuffleSpillReadTime * mult,
m1.shuffleSpillDeleteTime + m2.shuffleSpillDeleteTime * mult,
m1.peakExecutionMemory + m2.peakExecutionMemory * mult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ private[v1] class StagesResource extends BaseAppResource {
|| containsValue(Utils.bytesToString(task.taskMetrics.get.diskBytesSpilled))
|| containsValue(UIUtils.formatDuration(
task.taskMetrics.get.shuffleSpillWriteTime / 1000000))
|| containsValue(UIUtils.formatDuration(
task.taskMetrics.get.spillSortTime / 1000000))
|| containsValue(UIUtils.formatDuration(
task.taskMetrics.get.shuffleSpillReadTime / 1000000))
|| containsValue(UIUtils.formatDuration(
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class ExecutorStageSummary private[spark](
val memoryBytesSpilled : Long,
val diskBytesSpilled : Long,
val shuffleSpillWriteTime : Long,
val spillSortTime: Long,
val shuffleSpillReadTime : Long,
val shuffleSpillDeleteTime : Long,
val isBlacklistedForStage: Boolean)
Expand Down Expand Up @@ -230,6 +231,7 @@ class StageData private[spark](
val memoryBytesSpilled: Long,
val diskBytesSpilled: Long,
val shuffleSpillWriteTime: Long,
val spillSortTime: Long,
val shuffleSpillReadTime: Long,
val shuffleSpillDeleteTime: Long,
val peakExecutionMemory: Long,
Expand Down Expand Up @@ -292,6 +294,7 @@ class TaskMetrics private[spark](
val memoryBytesSpilled: Long,
val diskBytesSpilled: Long,
val shuffleSpillWriteTime: Long,
val spillSortTime: Long,
val shuffleSpillReadTime: Long,
val shuffleSpillDeleteTime: Long,
val peakExecutionMemory: Long,
Expand Down Expand Up @@ -338,6 +341,7 @@ class TaskMetricDistributions private[spark](
val memoryBytesSpilled: IndexedSeq[Double],
val diskBytesSpilled: IndexedSeq[Double],
val shuffleSpillWriteTime: IndexedSeq[Double],
val spillSortTime: IndexedSeq[Double],
val shuffleSpillReadTime: IndexedSeq[Double],
val shuffleSpillDeleteTime: IndexedSeq[Double],

Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/status/storeTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ private[spark] object TaskIndexNames {
final val SHUFFLE_SPILL_WRITE_TIME = "sswt"
final val SHUFFLE_SPILL_READ_TIME = "ssrt"
final val SHUFFLE_SPILL_DELETE_TIME = "ssdt"
final val SPILL_SORT_TIME = "sst"
final val STAGE = "stage"
final val STATUS = "sta"
final val TASK_INDEX = "idx"
Expand Down Expand Up @@ -208,10 +209,12 @@ private[spark] class TaskDataWrapper(
@KVIndexParam(value = TaskIndexNames.SHUFFLE_SPILL_WRITE_TIME, parent = TaskIndexNames.STAGE)
val shuffleSpillWriteTime: Long,
@KVIndexParam(value = TaskIndexNames.SHUFFLE_SPILL_READ_TIME, parent = TaskIndexNames.STAGE)
val spillSortTime: Long,
@KVIndexParam(value = TaskIndexNames.PEAK_MEM, parent = TaskIndexNames.STAGE)
val shuffleSpillReadTime: Long,
@KVIndexParam(value = TaskIndexNames.SHUFFLE_SPILL_DELETE_TIME, parent = TaskIndexNames.STAGE)
val shuffleSpillDeleteTime: Long,
@KVIndexParam(value = TaskIndexNames.PEAK_MEM, parent = TaskIndexNames.STAGE)
@KVIndexParam(value = TaskIndexNames.SPILL_SORT_TIME, parent = TaskIndexNames.STAGE)
val peakExecutionMemory: Long,
@KVIndexParam(value = TaskIndexNames.INPUT_SIZE, parent = TaskIndexNames.STAGE)
val inputBytesRead: Long,
Expand Down Expand Up @@ -268,6 +271,7 @@ private[spark] class TaskDataWrapper(
getMetricValue(memoryBytesSpilled),
getMetricValue(diskBytesSpilled),
getMetricValue(shuffleSpillWriteTime),
getMetricValue(spillSortTime),
getMetricValue(shuffleSpillReadTime),
getMetricValue(shuffleSpillDeleteTime),
getMetricValue(peakExecutionMemory),
Expand Down Expand Up @@ -493,6 +497,7 @@ private[spark] class CachedQuantile(
val diskBytesSpilled: Double,

val shuffleSpillWriteTime: Double,
val spillSortTime: Double,
val shuffleSpillReadTime: Double,
val shuffleSpillDeleteTime: Double,
val bytesRead: Double,
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
shuffleWriteRecords = 0L,

shuffleSpillWriteTime = 0L,
spillSortTime = 0L,
shuffleSpillReadTime = 0L,
shuffleSpillDeleteTime = 0L,
name = "Unknown",
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
<strong>Spill Write Time</strong>
{s"${UIUtils.formatDuration(stageData.shuffleSpillWriteTime / 1000000)}"}
</li>
<li>
<strong>Spill Sort Time</strong>
{s"${UIUtils.formatDuration(stageData.spillSortTime / 1000000)}"}
</li>
<li>
<strong>Spill Read Time</strong>
{s"${UIUtils.formatDuration(stageData.shuffleSpillReadTime / 1000000)}"}
Expand Down Expand Up @@ -714,6 +718,13 @@ private[ui] class TaskPagedTable(
},
hideZero = true)
}</td>
<td>{
formatDuration(
task.taskMetrics.map { m =>
TimeUnit.NANOSECONDS.toMillis(m.spillSortTime)
},
hideZero = true)
}</td>
<td>{
formatDuration(
task.taskMetrics.map { m =>
Expand Down Expand Up @@ -789,6 +800,7 @@ private[spark] object ApiHelper {
val HEADER_SHUFFLE_SPILL_WRITE_TIME = "Spill Write Time"
val HEADER_SHUFFLE_SPILL_READ_TIME = "Spill Read Time"
val HEADER_SHUFFLE_SPILL_DELETE_TIME = "Spill Delete Time"
val HEADER_SPILL_SORT_TIME = "Spill Sort Time"
val HEADER_MEM_SPILL = "Spill (Memory)"
val HEADER_DISK_SPILL = "Spill (Disk)"
val HEADER_ERROR = "Errors"
Expand Down Expand Up @@ -820,6 +832,7 @@ private[spark] object ApiHelper {
HEADER_SHUFFLE_WRITE_TIME -> TaskIndexNames.SHUFFLE_WRITE_TIME,
HEADER_SHUFFLE_WRITE_SIZE -> TaskIndexNames.SHUFFLE_WRITE_SIZE,
HEADER_SHUFFLE_SPILL_WRITE_TIME -> TaskIndexNames.SHUFFLE_SPILL_WRITE_TIME,
HEADER_SPILL_SORT_TIME -> TaskIndexNames.SPILL_SORT_TIME,
HEADER_SHUFFLE_SPILL_READ_TIME -> TaskIndexNames.SHUFFLE_SPILL_READ_TIME,
HEADER_SHUFFLE_SPILL_DELETE_TIME -> TaskIndexNames.SHUFFLE_SPILL_DELETE_TIME,
HEADER_MEM_SPILL -> TaskIndexNames.MEM_SPILL,
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ private[spark] object JsonProtocol {
("Memory Bytes Spilled" -> taskMetrics.memoryBytesSpilled) ~
("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~
("Spill Write Time" -> taskMetrics.shuffleSpillWriteTime) ~
("Spill Sort Time" -> taskMetrics.spillSortTime) ~
("Spill Read Time" -> taskMetrics.shuffleSpillReadTime) ~
("Spill Delete Time" -> taskMetrics.shuffleSpillDeleteTime) ~
("Shuffle Read Metrics" -> shuffleReadMetrics) ~
Expand Down Expand Up @@ -910,6 +911,7 @@ private[spark] object JsonProtocol {
metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long])
metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long])
metrics.incShuffleSpillWriteTime((json \ "Spill Write Time").extract[Long])
metrics.incSpillSortTime((json \ "Spill Sort Time").extract[Long])
metrics.incShuffleSpillReadTime((json \ "Spill Read Time").extract[Long])
metrics.incShuffleSpillDeleteTime((json \ "Spill Delete Time").extract[Long])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class AppStatusStoreSuite extends SparkFunSuite {
i.toLong, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None, true,
i, i, i, i, i, i, i, i, i, i,
i, i, i, i, i, i, i, i, i, i,
i, i, i, i, i, i, i, stageId, attemptId)
i, i, i, i, i, i, i, i, stageId, attemptId)
}

private def writeTaskDataToStore(i: Int, store: KVStore, status: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class AppStatusUtilsSuite extends SparkFunSuite {
memoryBytesSpilled = 0L,
diskBytesSpilled = 0L,
shuffleSpillWriteTime = 0L,
spillSortTime = 0L,
shuffleSpillReadTime = 0L,
shuffleSpillDeleteTime = 0L,
peakExecutionMemory = 0L,
Expand Down Expand Up @@ -86,6 +87,7 @@ class AppStatusUtilsSuite extends SparkFunSuite {
memoryBytesSpilled = 0L,
diskBytesSpilled = 0L,
shuffleSpillWriteTime = 0L,
spillSortTime = 0L,
shuffleSpillReadTime = 0L,
shuffleSpillDeleteTime = 0L,
peakExecutionMemory = 100L,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
shuffleWriteTime = 1L,
shuffleWriteRecords = 1L,
shuffleSpillWriteTime = 1L,
spillSortTime = 1L,
shuffleSpillReadTime = 1L,
shuffleSpillDeleteTime = 1L,


name = "stage1",
description = Some("description"),
details = "detail",
Expand Down

0 comments on commit 915d7c4

Please sign in to comment.