Skip to content

Commit

Permalink
Merge pull request apache#159 from hadoop/YSPARK-711_211
Browse files Browse the repository at this point in the history
[YSPARK-711] SparkSession DataFrameReader not showing byte/records Re
  • Loading branch information
Dhruve Ashar authored and GitHub Enterprise committed Jun 30, 2017
2 parents b71f0ca + 5f3d0ee commit 7de780b
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 25 deletions.
37 changes: 23 additions & 14 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal

import com.google.common.primitives.Longs
Expand Down Expand Up @@ -148,21 +149,29 @@ class SparkHadoopUtil extends Logging {
/**
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
* return the bytes read on r since t. Reflection is required because thread-level FileSystem
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
* Returns None if the required method can't be found.
* return the bytes read on r since t.
*/
private[spark] def getFSBytesReadOnThreadCallback(): Option[() => Long] = {
try {
val threadStats = getFileSystemThreadStatistics()
val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesRead = f()
Some(() => f() - baselineBytesRead)
} catch {
case e @ (_: NoSuchMethodException | _: ClassNotFoundException) =>
logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e)
None
private[spark] def getFSBytesReadOnThreadCallback(): () => Long = {
val f = () => FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum
val baseline = (Thread.currentThread().getId, f())

/**
* This function may be called in both spawned child threads and parent task thread (in
* PythonRDD), and Hadoop FileSystem uses thread local variables to track the statistics.
* So we need a map to track the bytes read from the child threads and parent thread,
* summing them together to get the bytes read of this task.
*/
new Function0[Long] {
private val bytesReadMap = new mutable.HashMap[Long, Long]()

override def apply(): Long = {
bytesReadMap.synchronized {
bytesReadMap.put(Thread.currentThread().getId, f())
bytesReadMap.map { case (k, v) =>
v - (if (k == baseline._1) baseline._2 else 0)
}.sum
}
}
}
}

Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class HadoopRDD[K, V](
// creating RecordReader, because RecordReader's constructor might read some bytes
private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
case _ => None
}

Expand Down Expand Up @@ -257,7 +257,13 @@ class HadoopRDD[K, V](
null
}
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
context.addTaskCompletionListener { context =>
// Update the bytes read before closing is to make sure lingering bytesRead statistics in
// this thread get correctly added.
updateBytesRead()
closeIfNeeded()
}

private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()

Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class NewHadoopRDD[K, V](
private val getBytesReadCallback: Option[() => Long] =
split.serializableHadoopSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
case _ => None
}

Expand Down Expand Up @@ -189,7 +189,13 @@ class NewHadoopRDD[K, V](
}

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => close())
context.addTaskCompletionListener { context =>
// Update the bytesRead before closing is to make sure lingering bytesRead statistics in
// this thread get correctly added.
updateBytesRead()
close()
}

private var havePair = false
private var recordsSinceMetricsUpdate = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{SharedSparkContext, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
with BeforeAndAfter {
Expand Down Expand Up @@ -331,6 +331,35 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
}
assert(bytesRead >= tmpFile.length())
}

test("input metrics with old Hadoop API in different thread") {
val bytesRead = runAndReturnBytesRead {
sc.textFile(tmpFilePath, 4).mapPartitions { iter =>
val buf = new ArrayBuffer[String]()
ThreadUtils.runInNewThread("testThread", false) {
iter.flatMap(_.split(" ")).foreach(buf.append(_))
}

buf.iterator
}.count()
}
assert(bytesRead >= tmpFile.length())
}

test("input metrics with new Hadoop API in different thread") {
val bytesRead = runAndReturnBytesRead {
sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable],
classOf[Text]).mapPartitions { iter =>
val buf = new ArrayBuffer[String]()
ThreadUtils.runInNewThread("testThread", false) {
iter.map(_._2.toString).flatMap(_.split(" ")).foreach(buf.append(_))
}

buf.iterator
}.count()
}
assert(bytesRead >= tmpFile.length())
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,21 @@ class FileScanRDD(

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// apply readFunction, because it might read some bytes.
private val getBytesReadCallback: Option[() => Long] =
private val getBytesReadCallback =
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()

// For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
// We get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
private def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback())
}

// If we can't get the bytes read from the FS stats, fall back to the file size,
// which may be inaccurate.
private def updateBytesReadWithFileSize(): Unit = {
if (getBytesReadCallback.isEmpty && currentFile != null) {
if (currentFile != null) {
inputMetrics.incBytesRead(currentFile.length)
}
}
Expand Down

0 comments on commit 7de780b

Please sign in to comment.