From 2ca1fbffa9842f4227217c3abdf352eeb8c82e39 Mon Sep 17 00:00:00 2001 From: yumwang Date: Tue, 11 Aug 2020 17:07:51 +0800 Subject: [PATCH] [SPARK-31793][SQL] Reduce the memory usage in file scan location metadata (#8) ### What changes were proposed in this pull request? Currently, the data source scan node stores all the paths in its metadata. The metadata is kept when a SparkPlan is converted into SparkPlanInfo. SparkPlanInfo can be used to construct the Spark plan graph in UI. However, the paths can be very large (e.g. it can be many partitions after partition pruning), while UI pages only require up to 100 bytes for the location metadata. We can reduce the paths stored in metadata to reduce memory usage. ### Why are the changes needed? Reduce unnecessary memory cost. In the heap dump of a driver, the SparkPlanInfo instances are quite large and it should be avoided: ![image](https://user-images.githubusercontent.com/1097932/82642318-8f65de00-9bc2-11ea-9c9c-f05c2b0e1c49.png) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests Closes #28610 from gengliangwang/improveLocationMetadata. Authored-by: Gengliang Wang Signed-off-by: Gengliang Wang (cherry picked from commit 9fdc2a08011f8571516fa9103102481d4cf9cfde) --- .../scala/org/apache/spark/util/Utils.scala | 18 +++++++++++++ .../org/apache/spark/util/UtilsSuite.scala | 8 ++++++ .../sql/execution/DataSourceScanExec.scala | 7 +++-- .../execution/datasources/v2/FileScan.scala | 6 +++-- .../DataSourceScanExecRedactionSuite.scala | 26 +++++++++++++++++++ 5 files changed, 61 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c7db2127a6f04..9636fe88c77c2 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2904,6 +2904,24 @@ private[spark] object Utils extends Logging { props.forEach((k, v) => resultProps.put(k, v)) resultProps } + + /** + * Convert a sequence of `Path`s to a metadata string. When the length of metadata string + * exceeds `stopAppendingThreshold`, stop appending paths for saving memory. + */ + def buildLocationMetadata(paths: Seq[Path], stopAppendingThreshold: Int): String = { + val metadata = new StringBuilder("[") + var index: Int = 0 + while (index < paths.length && metadata.length < stopAppendingThreshold) { + if (index > 0) { + metadata.append(", ") + } + metadata.append(paths(index).toString) + index += 1 + } + metadata.append("]") + metadata.toString + } } private[util] object CallerContext extends Logging { diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 931eb6b5413f7..c9c8ae6023877 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1301,6 +1301,14 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.trimExceptCRLF(s"b${s}b") === s"b${s}b") } } + + test("pathsToMetadata") { + val paths = (0 to 4).map(i => new Path(s"path$i")) + assert(Utils.buildLocationMetadata(paths, 5) == "[path0]") + assert(Utils.buildLocationMetadata(paths, 10) == "[path0, path1]") + assert(Utils.buildLocationMetadata(paths, 15) == "[path0, path1, path2]") + assert(Utils.buildLocationMetadata(paths, 25) == "[path0, path1, path2, path3]") + } } private class SimpleExtension diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 90a3f9788ca42..1711c43e3f8e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -55,10 +55,12 @@ trait DataSourceScanExec extends LeafExecNode { // Metadata that describes more details of this scan. protected def metadata: Map[String, String] + protected val maxMetadataValueLength = 100 + override def simpleString(maxFields: Int): String = { val metadataEntries = metadata.toSeq.sorted.map { case (key, value) => - key + ": " + StringUtils.abbreviate(redact(value), 100) + key + ": " + StringUtils.abbreviate(redact(value), maxMetadataValueLength) } val metadataStr = truncatedString(metadataEntries, " ", ", ", "", maxFields) redact( @@ -335,7 +337,8 @@ case class FileSourceScanExec( def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") val location = relation.location val locationDesc = - location.getClass.getSimpleName + seqToString(location.rootPaths) + location.getClass.getSimpleName + + Utils.buildLocationMetadata(location.rootPaths, maxMetadataValueLength) val metadata = Map( "Format" -> relation.fileFormat.toString, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 6e05aa56f4f72..7e8e0ed2dc675 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -94,8 +94,10 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin override def hashCode(): Int = getClass.hashCode() override def description(): String = { + val maxMetadataValueLength = 100 val locationDesc = - fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]") + fileIndex.getClass.getSimpleName + + Utils.buildLocationMetadata(fileIndex.rootPaths, maxMetadataValueLength) val metadata: Map[String, String] = Map( "ReadSchema" -> readDataSchema.catalogString, "PartitionFilters" -> seqToString(partitionFilters), @@ -105,7 +107,7 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin case (key, value) => val redactedValue = Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, value) - key + ": " + StringUtils.abbreviate(redactedValue, 100) + key + ": " + StringUtils.abbreviate(redactedValue, maxMetadataValueLength) }.mkString(", ") s"${this.getClass.getSimpleName} $metadataStr" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala index f1411b263c77b..c99be986ddca5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution +import java.io.File + import scala.collection.mutable import org.apache.hadoop.fs.Path @@ -116,6 +118,30 @@ class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest { assert(isIncluded(df.queryExecution, "Location")) } } + + test("SPARK-31793: FileSourceScanExec metadata should contain limited file paths") { + withTempPath { path => + val dir = path.getCanonicalPath + val partitionCol = "partitionCol" + spark.range(10) + .select("id", "id") + .toDF("value", partitionCol) + .write + .partitionBy(partitionCol) + .orc(dir) + val paths = (0 to 9).map(i => new File(dir, s"$partitionCol=$i").getCanonicalPath) + val plan = spark.read.orc(paths: _*).queryExecution.executedPlan + val location = plan collectFirst { + case f: FileSourceScanExec => f.metadata("Location") + } + assert(location.isDefined) + // The location metadata should at least contain one path + assert(location.get.contains(paths.head)) + // If the temp path length is larger than 100, the metadata length should not exceed + // twice of the length; otherwise, the metadata length should be controlled within 200. + assert(location.get.length < Math.max(paths.head.length, 100) * 2) + } + } } /**