Skip to content

Commit

Permalink
[SPARK-29431][WebUI] Improve Sql tab cached dataframes
Browse files Browse the repository at this point in the history
  • Loading branch information
planga82 committed Oct 10, 2019
1 parent 822b39c commit 967b4f5
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -33,7 +34,8 @@ class SparkPlanInfo(
val simpleString: String,
val children: Seq[SparkPlanInfo],
val metadata: Map[String, String],
val metrics: Seq[SQLMetricInfo]) {
val metrics: Seq[SQLMetricInfo],
val relation: Seq[SparkPlanInfo] = Seq()) {

override def hashCode(): Int = {
// hashCode of simpleString should be good enough to distinguish the plans from each other
Expand Down Expand Up @@ -67,10 +69,16 @@ private[execution] object SparkPlanInfo {
case fileScan: FileSourceScanExec => fileScan.metadata
case _ => Map[String, String]()
}

val relation = plan match {
case inMemTab: InMemoryTableScanExec => Seq(fromSparkPlan(inMemTab.relation.cachedPlan))
case _ => Seq()
}

new SparkPlanInfo(
plan.nodeName,
plan.simpleString(SQLConf.get.maxToStringFields),
children.map(fromSparkPlan),
metadata, metrics)
metadata, metrics, relation)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ object SparkPlanGraph {
if (parent != null) {
edges += SparkPlanGraphEdge(node.id, parent.id)
}
planInfo.children.foreach(
planInfo.children.union(planInfo.relation).foreach(
buildSparkPlanGraphNode(_, nodeIdGenerator, nodes, edges, node, subgraph, exchanges))
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.ui

import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.test.SharedSparkSession

class SparkPlanInfoSuite extends SharedSparkSession{

import testImplicits._

def vaidateSparkPlanInfoRelation(sparkPlanInfo: SparkPlanInfo): Unit = {
sparkPlanInfo.nodeName match {
case "InMemoryTableScan" => assert(sparkPlanInfo.relation.length == 1)
case _ =>
assert(sparkPlanInfo.relation.length == 0)
sparkPlanInfo.children.foreach(vaidateSparkPlanInfoRelation)
}
}

test("SparkPlanInfo creation from SparkPlan with InMemoryTableScan node") {
val dfWithCache = Seq(
(1, 1),
(2, 2)
).toDF().filter("_1 > 1").cache().repartition(10)

val planInfoResult = SparkPlanInfo.fromSparkPlan(dfWithCache.queryExecution.executedPlan)

vaidateSparkPlanInfoRelation(planInfoResult)
}
}

0 comments on commit 967b4f5

Please sign in to comment.