Skip to content

Commit

Permalink
SPARK-3179. Add task OutputMetrics.
Browse files Browse the repository at this point in the history
Author: Sandy Ryza <sandy@cloudera.com>

This patch had conflicts when merged, resolved by
Committer: Kay Ousterhout <kayousterhout@gmail.com>

Closes #2968 from sryza/sandy-spark-3179 and squashes the following commits:

dce4784 [Sandy Ryza] More review feedback
8d350d1 [Sandy Ryza] Fix test against Hadoop 2.5+
e7c74d0 [Sandy Ryza] More review feedback
6cff9c4 [Sandy Ryza] Review feedback
fb2dde0 [Sandy Ryza] SPARK-3179
  • Loading branch information
sryza authored and kayousterhout committed Nov 10, 2014
1 parent f8e5732 commit 3c2cff4
Show file tree
Hide file tree
Showing 16 changed files with 346 additions and 31 deletions.
46 changes: 39 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy

import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -133,14 +134,9 @@ class SparkHadoopUtil extends Logging {
*/
private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
: Option[() => Long] = {
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
val scheme = qualifiedPath.toUri().getScheme()
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
try {
val threadStats = stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
val statisticsDataClass =
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
val getBytesReadMethod = statisticsDataClass.getDeclaredMethod("getBytesRead")
val threadStats = getFileSystemThreadStatistics(path, conf)
val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesRead = f()
Some(() => f() - baselineBytesRead)
Expand All @@ -151,6 +147,42 @@ class SparkHadoopUtil extends Logging {
}
}
}

/**
* Returns a function that can be called to find Hadoop FileSystem bytes written. If
* getFSBytesWrittenOnThreadCallback is called from thread r at time t, the returned callback will
* return the bytes written on r since t. Reflection is required because thread-level FileSystem
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
* Returns None if the required method can't be found.
*/
private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration)
: Option[() => Long] = {
try {
val threadStats = getFileSystemThreadStatistics(path, conf)
val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesWritten = f()
Some(() => f() - baselineBytesWritten)
} catch {
case e: NoSuchMethodException => {
logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
None
}
}
}

private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
val scheme = qualifiedPath.toUri().getScheme()
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
}

private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
val statisticsDataClass =
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
statisticsDataClass.getDeclaredMethod(methodName)
}
}

object SparkHadoopUtil {
Expand Down
28 changes: 28 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ class TaskMetrics extends Serializable {
*/
var inputMetrics: Option[InputMetrics] = None

/**
* If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
* data was written are stored here.
*/
var outputMetrics: Option[OutputMetrics] = None

/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
* This includes read metrics aggregated over all the task's shuffle dependencies.
Expand Down Expand Up @@ -157,6 +163,16 @@ object DataReadMethod extends Enumeration with Serializable {
val Memory, Disk, Hadoop, Network = Value
}

/**
* :: DeveloperApi ::
* Method by which output data was written.
*/
@DeveloperApi
object DataWriteMethod extends Enumeration with Serializable {
type DataWriteMethod = Value
val Hadoop = Value
}

/**
* :: DeveloperApi ::
* Metrics about reading input data.
Expand All @@ -169,6 +185,18 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
var bytesRead: Long = 0L
}

/**
* :: DeveloperApi ::
* Metrics about writing output data.
*/
@DeveloperApi
case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
/**
* Total bytes written
*/
var bytesWritten: Long = 0L
}

/**
* :: DeveloperApi ::
* Metrics pertaining to shuffle data read in a given task.
Expand Down
51 changes: 48 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.reflect.ClassTag

import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
Expand All @@ -40,6 +40,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.{DataWriteMethod, OutputMetrics}
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
Expand Down Expand Up @@ -962,30 +963,40 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}

val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
val config = wrappedConf.value
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val hadoopContext = newTaskAttemptContext(config, attemptId)
val format = outfmt.newInstance
format match {
case c: Configurable => c.setConf(wrappedConf.value)
case c: Configurable => c.setConf(config)
case _ => ()
}
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)

val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)

val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
try {
var recordsWritten = 0L
while (iter.hasNext) {
val pair = iter.next()
writer.write(pair._1, pair._2)

// Update bytes written metric every few records
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
recordsWritten += 1
}
} finally {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
1
} : Int

Expand All @@ -1006,6 +1017,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def saveAsHadoopDataset(conf: JobConf) {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val wrappedConf = new SerializableWritable(hadoopConf)
val outputFormatInstance = hadoopConf.getOutputFormat
val keyClass = hadoopConf.getOutputKeyClass
val valueClass = hadoopConf.getOutputValueClass
Expand Down Expand Up @@ -1033,27 +1045,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.preSetup()

val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
val config = wrappedConf.value
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt

val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)

writer.setup(context.stageId, context.partitionId, attemptNumber)
writer.open()
try {
var recordsWritten = 0L
while (iter.hasNext) {
val record = iter.next()
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])

// Update bytes written metric every few records
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
recordsWritten += 1
}
} finally {
writer.close()
}
writer.commit()
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
}

self.context.runJob(self, writeToFile)
writer.commitJob()
}

private def initHadoopOutputMetrics(context: TaskContext, config: Configuration)
: (OutputMetrics, Option[() => Long]) = {
val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir"))
.map(new Path(_))
.flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config))
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
if (bytesWrittenCallback.isDefined) {
context.taskMetrics.outputMetrics = Some(outputMetrics)
}
(outputMetrics, bytesWrittenCallback)
}

private def maybeUpdateOutputMetrics(bytesWrittenCallback: Option[() => Long],
outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
&& bytesWrittenCallback.isDefined) {
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
}
}

/**
* Return an RDD with the keys of each tuple.
*/
Expand All @@ -1070,3 +1111,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord)
}

private[spark] object PairRDDFunctions {
val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
" INPUT_BYTES=" + metrics.bytesRead
case None => ""
}
val outputMetrics = taskMetrics.outputMetrics match {
case Some(metrics) =>
" OUTPUT_BYTES=" + metrics.bytesWritten
case None => ""
}
val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
case Some(metrics) =>
" BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
Expand All @@ -173,7 +178,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
" SHUFFLE_WRITE_TIME=" + metrics.shuffleWriteTime
case None => ""
}
stageLogInfo(stageId, status + info + executorRunTime + gcTime + inputMetrics +
stageLogInfo(stageId, status + info + executorRunTime + gcTime + inputMetrics + outputMetrics +
shuffleReadMetrics + writeMetrics)
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/ToolTips.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ private[spark] object ToolTips {

val INPUT = "Bytes read from Hadoop or from Spark storage."

val OUTPUT = "Bytes written to Hadoop."

val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage."

val SHUFFLE_READ =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
val executorToTasksFailed = HashMap[String, Int]()
val executorToDuration = HashMap[String, Long]()
val executorToInputBytes = HashMap[String, Long]()
val executorToOutputBytes = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()

Expand Down Expand Up @@ -78,6 +79,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
executorToInputBytes(eid) =
executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
}
metrics.outputMetrics.foreach { outputMetrics =>
executorToOutputBytes(eid) =
executorToOutputBytes.getOrElse(eid, 0L) + outputMetrics.bytesWritten
}
metrics.shuffleReadMetrics.foreach { shuffleRead =>
executorToShuffleRead(eid) =
executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobPr
<th>Failed Tasks</th>
<th>Succeeded Tasks</th>
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
<th><span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output</span></th>
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}>Shuffle Write</span></th>
<th>Shuffle Spill (Memory)</th>
Expand Down Expand Up @@ -77,6 +78,8 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobPr
<td>{v.succeededTasks}</td>
<td sorttable_customkey={v.inputBytes.toString}>
{Utils.bytesToString(v.inputBytes)}</td>
<td sorttable_customkey={v.outputBytes.toString}>
{Utils.bytesToString(v.outputBytes)}</td>
<td sorttable_customkey={v.shuffleRead.toString}>
{Utils.bytesToString(v.shuffleRead)}</td>
<td sorttable_customkey={v.shuffleWrite.toString}>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageData.inputBytes += inputBytesDelta
execSummary.inputBytes += inputBytesDelta

val outputBytesDelta =
(taskMetrics.outputMetrics.map(_.bytesWritten).getOrElse(0L)
- oldMetrics.flatMap(_.outputMetrics).map(_.bytesWritten).getOrElse(0L))
stageData.outputBytes += outputBytesDelta
execSummary.outputBytes += outputBytesDelta

val diskSpillDelta =
taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
stageData.diskBytesSpilled += diskSpillDelta
Expand Down
Loading

0 comments on commit 3c2cff4

Please sign in to comment.