Skip to content

Commit

Permalink
[SPARK-31793][SQL] Reduce the memory usage in file scan location meta…
Browse files Browse the repository at this point in the history
…data (#8)

### What changes were proposed in this pull request?

Currently, the data source scan node stores all the paths in its metadata. The metadata is kept when a SparkPlan is converted into SparkPlanInfo. SparkPlanInfo can be used to construct the Spark plan graph in UI.

However, the paths can be very large (e.g. it can be many partitions after partition pruning), while UI pages only require up to 100 bytes for the location metadata. We can reduce the paths stored in metadata to reduce memory usage.

### Why are the changes needed?

Reduce unnecessary memory cost.
In the heap dump of a driver, the SparkPlanInfo instances are quite large and it should be avoided:
![image](https://user-images.githubusercontent.com/1097932/82642318-8f65de00-9bc2-11ea-9c9c-f05c2b0e1c49.png)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests

Closes #28610 from gengliangwang/improveLocationMetadata.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
(cherry picked from commit 9fdc2a0)
  • Loading branch information
wangyum authored and mingmwang committed Aug 11, 2020
1 parent dcd6c6a commit 2ca1fbf
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 4 deletions.
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2904,6 +2904,24 @@ private[spark] object Utils extends Logging {
props.forEach((k, v) => resultProps.put(k, v))
resultProps
}

/**
* Convert a sequence of `Path`s to a metadata string. When the length of metadata string
* exceeds `stopAppendingThreshold`, stop appending paths for saving memory.
*/
def buildLocationMetadata(paths: Seq[Path], stopAppendingThreshold: Int): String = {
val metadata = new StringBuilder("[")
var index: Int = 0
while (index < paths.length && metadata.length < stopAppendingThreshold) {
if (index > 0) {
metadata.append(", ")
}
metadata.append(paths(index).toString)
index += 1
}
metadata.append("]")
metadata.toString
}
}

private[util] object CallerContext extends Logging {
Expand Down
8 changes: 8 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,14 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(Utils.trimExceptCRLF(s"b${s}b") === s"b${s}b")
}
}

test("pathsToMetadata") {
val paths = (0 to 4).map(i => new Path(s"path$i"))
assert(Utils.buildLocationMetadata(paths, 5) == "[path0]")
assert(Utils.buildLocationMetadata(paths, 10) == "[path0, path1]")
assert(Utils.buildLocationMetadata(paths, 15) == "[path0, path1, path2]")
assert(Utils.buildLocationMetadata(paths, 25) == "[path0, path1, path2, path3]")
}
}

private class SimpleExtension
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ trait DataSourceScanExec extends LeafExecNode {
// Metadata that describes more details of this scan.
protected def metadata: Map[String, String]

protected val maxMetadataValueLength = 100

override def simpleString(maxFields: Int): String = {
val metadataEntries = metadata.toSeq.sorted.map {
case (key, value) =>
key + ": " + StringUtils.abbreviate(redact(value), 100)
key + ": " + StringUtils.abbreviate(redact(value), maxMetadataValueLength)
}
val metadataStr = truncatedString(metadataEntries, " ", ", ", "", maxFields)
redact(
Expand Down Expand Up @@ -335,7 +337,8 @@ case class FileSourceScanExec(
def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
val location = relation.location
val locationDesc =
location.getClass.getSimpleName + seqToString(location.rootPaths)
location.getClass.getSimpleName +
Utils.buildLocationMetadata(location.rootPaths, maxMetadataValueLength)
val metadata =
Map(
"Format" -> relation.fileFormat.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin
override def hashCode(): Int = getClass.hashCode()

override def description(): String = {
val maxMetadataValueLength = 100
val locationDesc =
fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]")
fileIndex.getClass.getSimpleName +
Utils.buildLocationMetadata(fileIndex.rootPaths, maxMetadataValueLength)
val metadata: Map[String, String] = Map(
"ReadSchema" -> readDataSchema.catalogString,
"PartitionFilters" -> seqToString(partitionFilters),
Expand All @@ -105,7 +107,7 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin
case (key, value) =>
val redactedValue =
Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, value)
key + ": " + StringUtils.abbreviate(redactedValue, 100)
key + ": " + StringUtils.abbreviate(redactedValue, maxMetadataValueLength)
}.mkString(", ")
s"${this.getClass.getSimpleName} $metadataStr"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution

import java.io.File

import scala.collection.mutable

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -116,6 +118,30 @@ class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest {
assert(isIncluded(df.queryExecution, "Location"))
}
}

test("SPARK-31793: FileSourceScanExec metadata should contain limited file paths") {
withTempPath { path =>
val dir = path.getCanonicalPath
val partitionCol = "partitionCol"
spark.range(10)
.select("id", "id")
.toDF("value", partitionCol)
.write
.partitionBy(partitionCol)
.orc(dir)
val paths = (0 to 9).map(i => new File(dir, s"$partitionCol=$i").getCanonicalPath)
val plan = spark.read.orc(paths: _*).queryExecution.executedPlan
val location = plan collectFirst {
case f: FileSourceScanExec => f.metadata("Location")
}
assert(location.isDefined)
// The location metadata should at least contain one path
assert(location.get.contains(paths.head))
// If the temp path length is larger than 100, the metadata length should not exceed
// twice of the length; otherwise, the metadata length should be controlled within 200.
assert(location.get.length < Math.max(paths.head.length, 100) * 2)
}
}
}

/**
Expand Down

0 comments on commit 2ca1fbf

Please sign in to comment.