From 421c4f604c198422687ddf9795282cbafd4270b2 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 31 May 2017 22:34:53 -0700 Subject: [PATCH 1/2] [SPARK-20244][CORE] Handle incorrect bytesRead metrics when using PySpark Hadoop FileSystem's statistics in based on thread local variables, this is ok if the RDD computation chain is running in the same thread. But if child RDD creates another thread to consume the iterator got from Hadoop RDDs, the bytesRead computation will be error, because now the iterator's `next()` and `close()` may run in different threads. This could be happened when using PySpark with PythonRDD. So here building a map to track the `bytesRead` for different thread and add them together. This method will be used in three RDDs, `HadoopRDD`, `NewHadoopRDD` and `FileScanRDD`. I assume `FileScanRDD` cannot be called directly, so I only fixed `HadoopRDD` and `NewHadoopRDD`. Unit test and local cluster verification. Author: jerryshao Closes #17617 from jerryshao/SPARK-20244. Conflicts: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala --- .../apache/spark/deploy/SparkHadoopUtil.scala | 37 ++++++++++++------- .../org/apache/spark/rdd/HadoopRDD.scala | 8 +++- .../org/apache/spark/rdd/NewHadoopRDD.scala | 8 +++- .../metrics/InputOutputMetricsSuite.scala | 31 +++++++++++++++- 4 files changed, 67 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index d651d015a1660..7b3e26fcfafdc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -24,6 +24,7 @@ import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.util.control.NonFatal import com.google.common.primitives.Longs @@ -148,21 +149,29 @@ class SparkHadoopUtil extends Logging { /** * Returns a function that can be called to find Hadoop FileSystem bytes read. If * getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will - * return the bytes read on r since t. Reflection is required because thread-level FileSystem - * statistics are only available as of Hadoop 2.5 (see HADOOP-10688). - * Returns None if the required method can't be found. + * return the bytes read on r since t. */ - private[spark] def getFSBytesReadOnThreadCallback(): Option[() => Long] = { - try { - val threadStats = getFileSystemThreadStatistics() - val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead") - val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum - val baselineBytesRead = f() - Some(() => f() - baselineBytesRead) - } catch { - case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => - logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e) - None + private[spark] def getFSBytesReadOnThreadCallback(): () => Long = { + val f = () => FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum + val baseline = (Thread.currentThread().getId, f()) + + /** + * This function may be called in both spawned child threads and parent task thread (in + * PythonRDD), and Hadoop FileSystem uses thread local variables to track the statistics. + * So we need a map to track the bytes read from the child threads and parent thread, + * summing them together to get the bytes read of this task. + */ + new Function0[Long] { + private val bytesReadMap = new mutable.HashMap[Long, Long]() + + override def apply(): Long = { + bytesReadMap.synchronized { + bytesReadMap.put(Thread.currentThread().getId, f()) + bytesReadMap.map { case (k, v) => + v - (if (k == baseline._1) baseline._2 else 0) + }.sum + } + } } } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index b56ebf4df06e9..9916c6597a108 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -257,7 +257,13 @@ class HadoopRDD[K, V]( null } // Register an on-task-completion callback to close the input stream. - context.addTaskCompletionListener{ context => closeIfNeeded() } + context.addTaskCompletionListener { context => + // Update the bytes read before closing is to make sure lingering bytesRead statistics in + // this thread get correctly added. + updateBytesRead() + closeIfNeeded() + } + private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey() private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue() diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 6168d979032aa..724facd7bd501 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -189,7 +189,13 @@ class NewHadoopRDD[K, V]( } // Register an on-task-completion callback to close the input stream. - context.addTaskCompletionListener(context => close()) + context.addTaskCompletionListener { context => + // Update the bytesRead before closing is to make sure lingering bytesRead statistics in + // this thread get correctly added. + updateBytesRead() + close() + } + private var havePair = false private var recordsSinceMetricsUpdate = 0 diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index f8054f5fd7701..8d3819e44e59b 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -35,7 +35,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SharedSparkContext, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext with BeforeAndAfter { @@ -331,6 +331,35 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext } assert(bytesRead >= tmpFile.length()) } + + test("input metrics with old Hadoop API in different thread") { + val bytesRead = runAndReturnBytesRead { + sc.textFile(tmpFilePath, 4).mapPartitions { iter => + val buf = new ArrayBuffer[String]() + ThreadUtils.runInNewThread("testThread", false) { + iter.flatMap(_.split(" ")).foreach(buf.append(_)) + } + + buf.iterator + }.count() + } + assert(bytesRead >= tmpFile.length()) + } + + test("input metrics with new Hadoop API in different thread") { + val bytesRead = runAndReturnBytesRead { + sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable], + classOf[Text]).mapPartitions { iter => + val buf = new ArrayBuffer[String]() + ThreadUtils.runInNewThread("testThread", false) { + iter.map(_._2.toString).flatMap(_.split(" ")).foreach(buf.append(_)) + } + + buf.iterator + }.count() + } + assert(bytesRead >= tmpFile.length()) + } } /** From 5f3d0eea3958df4a4c974c90d33b5cdfaedc4278 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Wed, 28 Jun 2017 16:11:24 +0000 Subject: [PATCH 2/2] [YSPARK-711] SparkSession DataFrameReader not showing byte/records Read task stats --- .../main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- .../main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 2 +- .../spark/sql/execution/datasources/FileScanRDD.scala | 10 ++++------ 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 9916c6597a108..43e9e1954800f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -227,7 +227,7 @@ class HadoopRDD[K, V]( // creating RecordReader, because RecordReader's constructor might read some bytes private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match { case _: FileSplit | _: CombineFileSplit => - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()) case _ => None } diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 724facd7bd501..c323eaf2e6499 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -150,7 +150,7 @@ class NewHadoopRDD[K, V]( private val getBytesReadCallback: Option[() => Long] = split.serializableHadoopSplit.value match { case _: FileSplit | _: CombineFileSplit => - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()) case _ => None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 843459221e689..00bf050dfb51f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -74,23 +74,21 @@ class FileScanRDD( // Find a function that will return the FileSystem bytes read by this thread. Do this before // apply readFunction, because it might read some bytes. - private val getBytesReadCallback: Option[() => Long] = + private val getBytesReadCallback = SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() - // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics. + // We get our input bytes from thread-local Hadoop FileSystem statistics. // If we do a coalesce, however, we are likely to compute multiple partitions in the same // task and in the same thread, in which case we need to avoid override values written by // previous partitions (SPARK-13071). private def updateBytesRead(): Unit = { - getBytesReadCallback.foreach { getBytesRead => - inputMetrics.setBytesRead(existingBytesRead + getBytesRead()) - } + inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback()) } // If we can't get the bytes read from the FS stats, fall back to the file size, // which may be inaccurate. private def updateBytesReadWithFileSize(): Unit = { - if (getBytesReadCallback.isEmpty && currentFile != null) { + if (currentFile != null) { inputMetrics.incBytesRead(currentFile.length) } }