From 834b8609793525a5a486013732d8c98e1c6e6504 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 18 Dec 2018 23:21:52 -0800 Subject: [PATCH 01/11] [SPARK-26366][SQL] ReplaceExceptWithFilter should consider NULL as False ## What changes were proposed in this pull request? In `ReplaceExceptWithFilter` we do not consider properly the case in which the condition returns NULL. Indeed, in that case, since negating NULL still returns NULL, so it is not true the assumption that negating the condition returns all the rows which didn't satisfy it, rows returning NULL may not be returned. This happens when constraints inferred by `InferFiltersFromConstraints` are not enough, as it happens with `OR` conditions. The rule had also problems with non-deterministic conditions: in such a scenario, this rule would change the probability of the output. The PR fixes these problem by: - returning False for the condition when it is Null (in this way we do return all the rows which didn't satisfy it); - avoiding any transformation when the condition is non-deterministic. ## How was this patch tested? added UTs Closes #23315 from mgaido91/SPARK-26366. Authored-by: Marco Gaido Signed-off-by: gatorsmile --- .../optimizer/ReplaceExceptWithFilter.scala | 32 ++++++++------ .../optimizer/ReplaceOperatorSuite.scala | 44 ++++++++++++++----- .../org/apache/spark/sql/DatasetSuite.scala | 11 +++++ .../org/apache/spark/sql/SQLQuerySuite.scala | 38 ++++++++++++++++ 4 files changed, 101 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala index efd3944eba7f5..4996d24dfd298 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala @@ -36,7 +36,8 @@ import org.apache.spark.sql.catalyst.rules.Rule * Note: * Before flipping the filter condition of the right node, we should: * 1. Combine all it's [[Filter]]. - * 2. Apply InferFiltersFromConstraints rule (to take into account of NULL values in the condition). + * 2. Update the attribute references to the left node; + * 3. Add a Coalesce(condition, False) (to take into account of NULL values in the condition). */ object ReplaceExceptWithFilter extends Rule[LogicalPlan] { @@ -47,23 +48,28 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] { plan.transform { case e @ Except(left, right, false) if isEligible(left, right) => - val newCondition = transformCondition(left, skipProject(right)) - newCondition.map { c => - Distinct(Filter(Not(c), left)) - }.getOrElse { + val filterCondition = combineFilters(skipProject(right)).asInstanceOf[Filter].condition + if (filterCondition.deterministic) { + transformCondition(left, filterCondition).map { c => + Distinct(Filter(Not(c), left)) + }.getOrElse { + e + } + } else { e } } } - private def transformCondition(left: LogicalPlan, right: LogicalPlan): Option[Expression] = { - val filterCondition = - InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition - - val attributeNameMap: Map[String, Attribute] = left.output.map(x => (x.name, x)).toMap - - if (filterCondition.references.forall(r => attributeNameMap.contains(r.name))) { - Some(filterCondition.transform { case a: AttributeReference => attributeNameMap(a.name) }) + private def transformCondition(plan: LogicalPlan, condition: Expression): Option[Expression] = { + val attributeNameMap: Map[String, Attribute] = plan.output.map(x => (x.name, x)).toMap + if (condition.references.forall(r => attributeNameMap.contains(r.name))) { + val rewrittenCondition = condition.transform { + case a: AttributeReference => attributeNameMap(a.name) + } + // We need to consider as False when the condition is NULL, otherwise we do not return those + // rows containing NULL which are instead filtered in the Except right plan + Some(Coalesce(Seq(rewrittenCondition, Literal.FalseLiteral))) } else { None } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala index 3b1b2d588ef67..c8e15c7da763e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala @@ -20,11 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, Not} +import org.apache.spark.sql.catalyst.expressions.{Alias, Coalesce, If, Literal, Not} import org.apache.spark.sql.catalyst.expressions.aggregate.First import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types.BooleanType class ReplaceOperatorSuite extends PlanTest { @@ -65,8 +66,7 @@ class ReplaceOperatorSuite extends PlanTest { val correctAnswer = Aggregate(table1.output, table1.output, - Filter(Not((attributeA.isNotNull && attributeB.isNotNull) && - (attributeA >= 2 && attributeB < 1)), + Filter(Not(Coalesce(Seq(attributeA >= 2 && attributeB < 1, Literal.FalseLiteral))), Filter(attributeB === 2, Filter(attributeA === 1, table1)))).analyze comparePlans(optimized, correctAnswer) @@ -84,8 +84,8 @@ class ReplaceOperatorSuite extends PlanTest { val correctAnswer = Aggregate(table1.output, table1.output, - Filter(Not((attributeA.isNotNull && attributeB.isNotNull) && - (attributeA >= 2 && attributeB < 1)), table1)).analyze + Filter(Not(Coalesce(Seq(attributeA >= 2 && attributeB < 1, Literal.FalseLiteral))), + table1)).analyze comparePlans(optimized, correctAnswer) } @@ -104,8 +104,7 @@ class ReplaceOperatorSuite extends PlanTest { val correctAnswer = Aggregate(table1.output, table1.output, - Filter(Not((attributeA.isNotNull && attributeB.isNotNull) && - (attributeA >= 2 && attributeB < 1)), + Filter(Not(Coalesce(Seq(attributeA >= 2 && attributeB < 1, Literal.FalseLiteral))), Project(Seq(attributeA, attributeB), table1))).analyze comparePlans(optimized, correctAnswer) @@ -125,8 +124,7 @@ class ReplaceOperatorSuite extends PlanTest { val correctAnswer = Aggregate(table1.output, table1.output, - Filter(Not((attributeA.isNotNull && attributeB.isNotNull) && - (attributeA >= 2 && attributeB < 1)), + Filter(Not(Coalesce(Seq(attributeA >= 2 && attributeB < 1, Literal.FalseLiteral))), Filter(attributeB === 2, Filter(attributeA === 1, table1)))).analyze comparePlans(optimized, correctAnswer) @@ -146,8 +144,7 @@ class ReplaceOperatorSuite extends PlanTest { val correctAnswer = Aggregate(table1.output, table1.output, - Filter(Not((attributeA.isNotNull && attributeB.isNotNull) && - (attributeA === 1 && attributeB === 2)), + Filter(Not(Coalesce(Seq(attributeA === 1 && attributeB === 2, Literal.FalseLiteral))), Project(Seq(attributeA, attributeB), Filter(attributeB < 1, Filter(attributeA >= 2, table1))))).analyze @@ -229,4 +226,29 @@ class ReplaceOperatorSuite extends PlanTest { comparePlans(optimized, query) } + + test("SPARK-26366: ReplaceExceptWithFilter should handle properly NULL") { + val basePlan = LocalRelation(Seq('a.int, 'b.int)) + val otherPlan = basePlan.where('a.in(1, 2) || 'b.in()) + val except = Except(basePlan, otherPlan, false) + val result = OptimizeIn(Optimize.execute(except.analyze)) + val correctAnswer = Aggregate(basePlan.output, basePlan.output, + Filter(!Coalesce(Seq( + 'a.in(1, 2) || If('b.isNotNull, Literal.FalseLiteral, Literal(null, BooleanType)), + Literal.FalseLiteral)), + basePlan)).analyze + comparePlans(result, correctAnswer) + } + + test("SPARK-26366: ReplaceExceptWithFilter should not transform non-detrministic") { + val basePlan = LocalRelation(Seq('a.int, 'b.int)) + val otherPlan = basePlan.where('a > rand(1L)) + val except = Except(basePlan, otherPlan, false) + val result = Optimize.execute(except.analyze) + val condition = basePlan.output.zip(otherPlan.output).map { case (a1, a2) => + a1 <=> a2 }.reduce( _ && _) + val correctAnswer = Aggregate(basePlan.output, otherPlan.output, + Join(basePlan, otherPlan, LeftAnti, Option(condition))).analyze + comparePlans(result, correctAnswer) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 525c7cef39563..c90b15814a534 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1656,6 +1656,17 @@ class DatasetSuite extends QueryTest with SharedSQLContext { checkAnswer(df.groupBy(col("a")).agg(first(col("b"))), Seq(Row("0", BigDecimal.valueOf(0.1111)), Row("1", BigDecimal.valueOf(1.1111)))) } + + test("SPARK-26366: return nulls which are not filtered in except") { + val inputDF = sqlContext.createDataFrame( + sparkContext.parallelize(Seq(Row("0", "a"), Row("1", null))), + StructType(Seq( + StructField("a", StringType, nullable = true), + StructField("b", StringType, nullable = true)))) + + val exceptDF = inputDF.filter(col("a").isin("0") or col("b") > "c") + checkAnswer(inputDF.except(exceptDF), Seq(Row("1", null))) + } } case class TestDataUnion(x: Int, y: Int, z: Int) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4cc8a45391996..37a8815350a53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2899,6 +2899,44 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-26366: verify ReplaceExceptWithFilter") { + Seq(true, false).foreach { enabled => + withSQLConf(SQLConf.REPLACE_EXCEPT_WITH_FILTER.key -> enabled.toString) { + val df = spark.createDataFrame( + sparkContext.parallelize(Seq(Row(0, 3, 5), + Row(0, 3, null), + Row(null, 3, 5), + Row(0, null, 5), + Row(0, null, null), + Row(null, null, 5), + Row(null, 3, null), + Row(null, null, null))), + StructType(Seq(StructField("c1", IntegerType), + StructField("c2", IntegerType), + StructField("c3", IntegerType)))) + val where = "c2 >= 3 OR c1 >= 0" + val whereNullSafe = + """ + |(c2 IS NOT NULL AND c2 >= 3) + |OR (c1 IS NOT NULL AND c1 >= 0) + """.stripMargin + + val df_a = df.filter(where) + val df_b = df.filter(whereNullSafe) + checkAnswer(df.except(df_a), df.except(df_b)) + + val whereWithIn = "c2 >= 3 OR c1 in (2)" + val whereWithInNullSafe = + """ + |(c2 IS NOT NULL AND c2 >= 3) + """.stripMargin + val dfIn_a = df.filter(whereWithIn) + val dfIn_b = df.filter(whereWithInNullSafe) + checkAnswer(df.except(dfIn_a), df.except(dfIn_b)) + } + } + } } case class Foo(bar: Option[String]) From 08f74ada3656af401099aa79471ef8a1155a3f07 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 19 Dec 2018 09:41:30 -0800 Subject: [PATCH 02/11] [SPARK-26390][SQL] ColumnPruning rule should only do column pruning ## What changes were proposed in this pull request? This is a small clean up. By design catalyst rules should be orthogonal: each rule should have its own responsibility. However, the `ColumnPruning` rule does not only do column pruning, but also remove no-op project and window. This PR updates the `RemoveRedundantProject` rule to remove no-op window as well, and clean up the `ColumnPruning` rule to only do column pruning. ## How was this patch tested? existing tests Closes #23343 from cloud-fan/column-pruning. Authored-by: Wenchen Fan Signed-off-by: Dongjoon Hyun --- .../sql/catalyst/optimizer/Optimizer.scala | 23 +++++++++---------- .../optimizer/ColumnPruningSuite.scala | 7 +++--- .../optimizer/CombiningLimitsSuite.scala | 5 ++-- .../optimizer/JoinOptimizationSuite.scala | 1 + .../RemoveRedundantAliasAndProjectSuite.scala | 2 +- .../optimizer/RewriteSubquerySuite.scala | 2 +- .../optimizer/TransposeWindowSuite.scala | 2 +- 7 files changed, 21 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3eb6bca6ec976..44d5543114902 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -93,7 +93,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) RewriteCorrelatedScalarSubquery, EliminateSerialization, RemoveRedundantAliases, - RemoveRedundantProject, + RemoveNoopOperators, SimplifyExtractValueOps, CombineConcats) ++ extendedOperatorOptimizationRules @@ -177,7 +177,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) RewritePredicateSubquery, ColumnPruning, CollapseProject, - RemoveRedundantProject) :+ + RemoveNoopOperators) :+ Batch("UpdateAttributeReferences", Once, UpdateNullabilityInAttributeReferences) } @@ -403,11 +403,15 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { } /** - * Remove projections from the query plan that do not make any modifications. + * Remove no-op operators from the query plan that do not make any modifications. */ -object RemoveRedundantProject extends Rule[LogicalPlan] { +object RemoveNoopOperators extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case p @ Project(_, child) if p.output == child.output => child + // Eliminate no-op Projects + case p @ Project(_, child) if child.sameOutput(p) => child + + // Eliminate no-op Window + case w: Window if w.windowExpressions.isEmpty => w.child } } @@ -602,17 +606,12 @@ object ColumnPruning extends Rule[LogicalPlan] { p.copy(child = w.copy( windowExpressions = w.windowExpressions.filter(p.references.contains))) - // Eliminate no-op Window - case w: Window if w.windowExpressions.isEmpty => w.child - - // Eliminate no-op Projects - case p @ Project(_, child) if child.sameOutput(p) => child - // Can't prune the columns on LeafNode case p @ Project(_, _: LeafNode) => p // for all other logical plans that inherits the output from it's children - case p @ Project(_, child) => + // Project over project is handled by the first case, skip it here. + case p @ Project(_, child) if !child.isInstanceOf[Project] => val required = child.references ++ p.references if (!child.inputSet.subsetOf(required)) { val newChildren = child.children.map(c => prunedChild(c, required)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 8d7c9bf220bc2..57195d5fda7c5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -34,6 +34,7 @@ class ColumnPruningSuite extends PlanTest { val batches = Batch("Column pruning", FixedPoint(100), PushDownPredicate, ColumnPruning, + RemoveNoopOperators, CollapseProject) :: Nil } @@ -340,10 +341,8 @@ class ColumnPruningSuite extends PlanTest { test("Column pruning on Union") { val input1 = LocalRelation('a.int, 'b.string, 'c.double) val input2 = LocalRelation('c.int, 'd.string, 'e.double) - val query = Project('b :: Nil, - Union(input1 :: input2 :: Nil)).analyze - val expected = Project('b :: Nil, - Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil)).analyze + val query = Project('b :: Nil, Union(input1 :: input2 :: Nil)).analyze + val expected = Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil).analyze comparePlans(Optimize.execute(query), expected) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index ef4b848924f06..b190dd5a7c220 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -27,8 +27,9 @@ class CombiningLimitsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = - Batch("Filter Pushdown", FixedPoint(100), - ColumnPruning) :: + Batch("Column Pruning", FixedPoint(100), + ColumnPruning, + RemoveNoopOperators) :: Batch("Combine Limit", FixedPoint(10), CombineLimits) :: Batch("Constant Folding", FixedPoint(10), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index e9438b2eee550..6fe5e619d03ad 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -39,6 +39,7 @@ class JoinOptimizationSuite extends PlanTest { ReorderJoin, PushPredicateThroughJoin, ColumnPruning, + RemoveNoopOperators, CollapseProject) :: Nil } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala index 1973b5abb462d..3802dbf5d6e06 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala @@ -33,7 +33,7 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper FixedPoint(50), PushProjectionThroughUnion, RemoveRedundantAliases, - RemoveRedundantProject) :: Nil + RemoveNoopOperators) :: Nil } test("all expressions in project list are aliased child output") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala index 6b3739c372c3a..f00d22e6e96a6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala @@ -34,7 +34,7 @@ class RewriteSubquerySuite extends PlanTest { RewritePredicateSubquery, ColumnPruning, CollapseProject, - RemoveRedundantProject) :: Nil + RemoveNoopOperators) :: Nil } test("Column pruning after rewriting predicate subquery") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala index 58b3d1c98f3cd..4acd57832d2f6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor class TransposeWindowSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = - Batch("CollapseProject", FixedPoint(100), CollapseProject, RemoveRedundantProject) :: + Batch("CollapseProject", FixedPoint(100), CollapseProject, RemoveNoopOperators) :: Batch("FlipWindow", Once, CollapseWindow, TransposeWindow) :: Nil } From 61c443acd23c74ebcb20fd32e5e0ed6c1722b5dc Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 20 Dec 2018 10:41:45 +0800 Subject: [PATCH 03/11] [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed config sets: WHOLESTAGE_CODEGEN_ENABLED and CODEGEN_FACTORY_MODE ## What changes were proposed in this pull request? For better test coverage, this pr proposed to use the 4 mixed config sets of `WHOLESTAGE_CODEGEN_ENABLED` and `CODEGEN_FACTORY_MODE` when running `SQLQueryTestSuite`: 1. WHOLESTAGE_CODEGEN_ENABLED=true, CODEGEN_FACTORY_MODE=CODEGEN_ONLY 2. WHOLESTAGE_CODEGEN_ENABLED=false, CODEGEN_FACTORY_MODE=CODEGEN_ONLY 3. WHOLESTAGE_CODEGEN_ENABLED=true, CODEGEN_FACTORY_MODE=NO_CODEGEN 4. WHOLESTAGE_CODEGEN_ENABLED=false, CODEGEN_FACTORY_MODE=NO_CODEGEN This pr also moved some existing tests into `ExplainSuite` because explain output results are different between codegen and interpreter modes. ## How was this patch tested? Existing tests. Closes #23213 from maropu/InterpreterModeTest. Authored-by: Takeshi Yamamuro Signed-off-by: Wenchen Fan --- .../resources/sql-tests/inputs/group-by.sql | 5 - .../sql-tests/inputs/inline-table.sql | 3 - .../resources/sql-tests/inputs/operators.sql | 21 -- .../inputs/sql-compatibility-functions.sql | 5 - .../sql-tests/inputs/string-functions.sql | 27 --- .../inputs/table-valued-functions.sql | 6 - .../sql-tests/results/group-by.sql.out | 30 +-- .../sql-tests/results/inline-table.sql.out | 32 +-- .../sql-tests/results/operators.sql.out | 204 +++++++----------- .../sql-compatibility-functions.sql.out | 61 ++---- .../results/string-functions.sql.out | 131 +++-------- .../results/table-valued-functions.sql.out | 41 +--- .../org/apache/spark/sql/ExplainSuite.scala | 133 +++++++++++- .../apache/spark/sql/SQLQueryTestSuite.scala | 51 ++--- 14 files changed, 281 insertions(+), 469 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql index ec263ea70bd4a..7e81ff1aba37b 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql @@ -141,8 +141,3 @@ SELECT every("true"); SELECT k, v, every(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; SELECT k, v, some(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; SELECT k, v, any(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; - --- simple explain of queries having every/some/any agregates. Optimized --- plan should show the rewritten aggregate expression. -EXPLAIN EXTENDED SELECT k, every(v), some(v), any(v) FROM test_agg GROUP BY k; - diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql index 41d316444ed6b..b3ec956cd178e 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql @@ -49,6 +49,3 @@ select * from values ("one", count(1)), ("two", 2) as data(a, b); -- string to timestamp select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991-12-06 01:00:00.0'), timestamp('1991-12-06 12:00:00.0'))) as data(a, b); - --- cross-join inline tables -EXPLAIN EXTENDED SELECT * FROM VALUES ('one', 1), ('three', null) CROSS JOIN VALUES ('one', 1), ('three', null); diff --git a/sql/core/src/test/resources/sql-tests/inputs/operators.sql b/sql/core/src/test/resources/sql-tests/inputs/operators.sql index 37f9cd44da7f2..ba14789d48db6 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/operators.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/operators.sql @@ -29,27 +29,6 @@ select 2 * 5; select 5 % 3; select pmod(-7, 3); --- check operator precedence. --- We follow Oracle operator precedence in the table below that lists the levels of precedence --- among SQL operators from high to low: ------------------------------------------------------------------------------------------- --- Operator Operation ------------------------------------------------------------------------------------------- --- +, - identity, negation --- *, / multiplication, division --- +, -, || addition, subtraction, concatenation --- =, !=, <, >, <=, >=, IS NULL, LIKE, BETWEEN, IN comparison --- NOT exponentiation, logical negation --- AND conjunction --- OR disjunction ------------------------------------------------------------------------------------------- -explain select 'a' || 1 + 2; -explain select 1 - 2 || 'b'; -explain select 2 * 4 + 3 || 'b'; -explain select 3 + 1 || 'a' || 4 / 2; -explain select 1 == 1 OR 'a' || 'b' == 'ab'; -explain select 'a' || 'c' == 'ac' AND 2 == 3; - -- math functions select cot(1); select cot(null); diff --git a/sql/core/src/test/resources/sql-tests/inputs/sql-compatibility-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/sql-compatibility-functions.sql index f1461032065ad..1ae49c8bfc76a 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/sql-compatibility-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/sql-compatibility-functions.sql @@ -12,11 +12,6 @@ SELECT nullif(1, 2.1d), nullif(1, 1.0d); SELECT nvl(1, 2.1d), nvl(null, 2.1d); SELECT nvl2(null, 1, 2.1d), nvl2('n', 1, 2.1d); --- explain for these functions; use range to avoid constant folding -explain extended -select ifnull(id, 'x'), nullif(id, 'x'), nvl(id, 'x'), nvl2(id, 'x', 'y') -from range(2); - -- SPARK-16730 cast alias functions for Hive compatibility SELECT boolean(1), tinyint(1), smallint(1), int(1), bigint(1); SELECT float(1), double(1), decimal(1); diff --git a/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql index 2effb43183d75..fbc231627e36f 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql @@ -5,10 +5,6 @@ select format_string(); -- A pipe operator for string concatenation select 'a' || 'b' || 'c'; --- Check if catalyst combine nested `Concat`s -EXPLAIN EXTENDED SELECT (col1 || col2 || col3 || col4) col -FROM (SELECT id col1, id col2, id col3, id col4 FROM range(10)); - -- replace function select replace('abc', 'b', '123'); select replace('abc', 'b'); @@ -25,29 +21,6 @@ select left(null, -2), left("abcd", -2), left("abcd", 0), left("abcd", 'a'); select right("abcd", 2), right("abcd", 5), right("abcd", '2'), right("abcd", null); select right(null, -2), right("abcd", -2), right("abcd", 0), right("abcd", 'a'); --- turn off concatBinaryAsString -set spark.sql.function.concatBinaryAsString=false; - --- Check if catalyst combine nested `Concat`s if concatBinaryAsString=false -EXPLAIN SELECT ((col1 || col2) || (col3 || col4)) col -FROM ( - SELECT - string(id) col1, - string(id + 1) col2, - encode(string(id + 2), 'utf-8') col3, - encode(string(id + 3), 'utf-8') col4 - FROM range(10) -); - -EXPLAIN SELECT (col1 || (col3 || col4)) col -FROM ( - SELECT - string(id) col1, - encode(string(id + 2), 'utf-8') col3, - encode(string(id + 3), 'utf-8') col4 - FROM range(10) -); - -- split function SELECT split('aa1cc2ee3', '[1-9]+'); SELECT split('aa1cc2ee3', '[1-9]+', 2); diff --git a/sql/core/src/test/resources/sql-tests/inputs/table-valued-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/table-valued-functions.sql index 72cd8ca9d8722..6f14c8ca87821 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/table-valued-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/table-valued-functions.sql @@ -21,9 +21,3 @@ select * from range(1, null); -- range call with a mixed-case function name select * from RaNgE(2); - --- Explain -EXPLAIN select * from RaNgE(2); - --- cross-join table valued functions -EXPLAIN EXTENDED SELECT * FROM range(3) CROSS JOIN range(3); diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out index 9a8d025331b67..daf47c4d0a39a 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 47 +-- Number of queries: 46 -- !query 0 @@ -459,31 +459,3 @@ struct --- !query 46 output -== Parsed Logical Plan == -'Aggregate ['k], ['k, unresolvedalias('every('v), None), unresolvedalias('some('v), None), unresolvedalias('any('v), None)] -+- 'UnresolvedRelation `test_agg` - -== Analyzed Logical Plan == -k: int, every(v): boolean, some(v): boolean, any(v): boolean -Aggregate [k#x], [k#x, every(v#x) AS every(v)#x, some(v#x) AS some(v)#x, any(v#x) AS any(v)#x] -+- SubqueryAlias `test_agg` - +- Project [k#x, v#x] - +- SubqueryAlias `test_agg` - +- LocalRelation [k#x, v#x] - -== Optimized Logical Plan == -Aggregate [k#x], [k#x, min(v#x) AS every(v)#x, max(v#x) AS some(v)#x, max(v#x) AS any(v)#x] -+- LocalRelation [k#x, v#x] - -== Physical Plan == -*HashAggregate(keys=[k#x], functions=[min(v#x), max(v#x)], output=[k#x, every(v)#x, some(v)#x, any(v)#x]) -+- Exchange hashpartitioning(k#x, 200) - +- *HashAggregate(keys=[k#x], functions=[partial_min(v#x), partial_max(v#x)], output=[k#x, min#x, max#x]) - +- LocalTableScan [k#x, v#x] diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out index c065ce5012929..4e80f0bda5513 100644 --- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 18 +-- Number of queries: 17 -- !query 0 @@ -151,33 +151,3 @@ select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991- struct> -- !query 16 output 1991-12-06 00:00:00 [1991-12-06 01:00:00.0,1991-12-06 12:00:00.0] - - --- !query 17 -EXPLAIN EXTENDED SELECT * FROM VALUES ('one', 1), ('three', null) CROSS JOIN VALUES ('one', 1), ('three', null) --- !query 17 schema -struct --- !query 17 output -== Parsed Logical Plan == -'Project [*] -+- 'Join Cross - :- 'UnresolvedInlineTable [col1, col2], [List(one, 1), List(three, null)] - +- 'UnresolvedInlineTable [col1, col2], [List(one, 1), List(three, null)] - -== Analyzed Logical Plan == -col1: string, col2: int, col1: string, col2: int -Project [col1#x, col2#x, col1#x, col2#x] -+- Join Cross - :- LocalRelation [col1#x, col2#x] - +- LocalRelation [col1#x, col2#x] - -== Optimized Logical Plan == -Join Cross -:- LocalRelation [col1#x, col2#x] -+- LocalRelation [col1#x, col2#x] - -== Physical Plan == -BroadcastNestedLoopJoin BuildRight, Cross -:- LocalTableScan [col1#x, col2#x] -+- BroadcastExchange IdentityBroadcastMode - +- LocalTableScan [col1#x, col2#x] diff --git a/sql/core/src/test/resources/sql-tests/results/operators.sql.out b/sql/core/src/test/resources/sql-tests/results/operators.sql.out index 570b281353f3d..e0cbd575bc346 100644 --- a/sql/core/src/test/resources/sql-tests/results/operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/operators.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 55 +-- Number of queries: 49 -- !query 0 @@ -195,260 +195,200 @@ struct -- !query 24 -explain select 'a' || 1 + 2 +select cot(1) -- !query 24 schema -struct +struct -- !query 24 output -== Physical Plan == -*Project [null AS (CAST(concat(a, CAST(1 AS STRING)) AS DOUBLE) + CAST(2 AS DOUBLE))#x] -+- *Scan OneRowRelation[] +0.6420926159343306 -- !query 25 -explain select 1 - 2 || 'b' +select cot(null) -- !query 25 schema -struct +struct -- !query 25 output -== Physical Plan == -*Project [-1b AS concat(CAST((1 - 2) AS STRING), b)#x] -+- *Scan OneRowRelation[] +NULL -- !query 26 -explain select 2 * 4 + 3 || 'b' +select cot(0) -- !query 26 schema -struct +struct -- !query 26 output -== Physical Plan == -*Project [11b AS concat(CAST(((2 * 4) + 3) AS STRING), b)#x] -+- *Scan OneRowRelation[] +Infinity -- !query 27 -explain select 3 + 1 || 'a' || 4 / 2 +select cot(-1) -- !query 27 schema -struct +struct -- !query 27 output -== Physical Plan == -*Project [4a2.0 AS concat(concat(CAST((3 + 1) AS STRING), a), CAST((CAST(4 AS DOUBLE) / CAST(2 AS DOUBLE)) AS STRING))#x] -+- *Scan OneRowRelation[] +-0.6420926159343306 -- !query 28 -explain select 1 == 1 OR 'a' || 'b' == 'ab' +select ceiling(0) -- !query 28 schema -struct +struct -- !query 28 output -== Physical Plan == -*Project [true AS ((1 = 1) OR (concat(a, b) = ab))#x] -+- *Scan OneRowRelation[] +0 -- !query 29 -explain select 'a' || 'c' == 'ac' AND 2 == 3 +select ceiling(1) -- !query 29 schema -struct +struct -- !query 29 output -== Physical Plan == -*Project [false AS ((concat(a, c) = ac) AND (2 = 3))#x] -+- *Scan OneRowRelation[] +1 -- !query 30 -select cot(1) +select ceil(1234567890123456) -- !query 30 schema -struct +struct -- !query 30 output -0.6420926159343306 +1234567890123456 -- !query 31 -select cot(null) +select ceiling(1234567890123456) -- !query 31 schema -struct +struct -- !query 31 output -NULL +1234567890123456 -- !query 32 -select cot(0) +select ceil(0.01) -- !query 32 schema -struct +struct -- !query 32 output -Infinity +1 -- !query 33 -select cot(-1) +select ceiling(-0.10) -- !query 33 schema -struct +struct -- !query 33 output --0.6420926159343306 +0 -- !query 34 -select ceiling(0) +select floor(0) -- !query 34 schema -struct +struct -- !query 34 output 0 -- !query 35 -select ceiling(1) +select floor(1) -- !query 35 schema -struct +struct -- !query 35 output 1 -- !query 36 -select ceil(1234567890123456) +select floor(1234567890123456) -- !query 36 schema -struct +struct -- !query 36 output 1234567890123456 -- !query 37 -select ceiling(1234567890123456) --- !query 37 schema -struct --- !query 37 output -1234567890123456 - - --- !query 38 -select ceil(0.01) --- !query 38 schema -struct --- !query 38 output -1 - - --- !query 39 -select ceiling(-0.10) --- !query 39 schema -struct --- !query 39 output -0 - - --- !query 40 -select floor(0) --- !query 40 schema -struct --- !query 40 output -0 - - --- !query 41 -select floor(1) --- !query 41 schema -struct --- !query 41 output -1 - - --- !query 42 -select floor(1234567890123456) --- !query 42 schema -struct --- !query 42 output -1234567890123456 - - --- !query 43 select floor(0.01) --- !query 43 schema +-- !query 37 schema struct --- !query 43 output +-- !query 37 output 0 --- !query 44 +-- !query 38 select floor(-0.10) --- !query 44 schema +-- !query 38 schema struct --- !query 44 output +-- !query 38 output -1 --- !query 45 +-- !query 39 select 1 > 0.00001 --- !query 45 schema +-- !query 39 schema struct<(CAST(1 AS BIGINT) > 0):boolean> --- !query 45 output +-- !query 39 output true --- !query 46 +-- !query 40 select mod(7, 2), mod(7, 0), mod(0, 2), mod(7, null), mod(null, 2), mod(null, null) --- !query 46 schema +-- !query 40 schema struct<(7 % 2):int,(7 % 0):int,(0 % 2):int,(7 % CAST(NULL AS INT)):int,(CAST(NULL AS INT) % 2):int,(CAST(NULL AS DOUBLE) % CAST(NULL AS DOUBLE)):double> --- !query 46 output +-- !query 40 output 1 NULL 0 NULL NULL NULL --- !query 47 +-- !query 41 select BIT_LENGTH('abc') --- !query 47 schema +-- !query 41 schema struct --- !query 47 output +-- !query 41 output 24 --- !query 48 +-- !query 42 select CHAR_LENGTH('abc') --- !query 48 schema +-- !query 42 schema struct --- !query 48 output +-- !query 42 output 3 --- !query 49 +-- !query 43 select CHARACTER_LENGTH('abc') --- !query 49 schema +-- !query 43 schema struct --- !query 49 output +-- !query 43 output 3 --- !query 50 +-- !query 44 select OCTET_LENGTH('abc') --- !query 50 schema +-- !query 44 schema struct --- !query 50 output +-- !query 44 output 3 --- !query 51 +-- !query 45 select abs(-3.13), abs('-2.19') --- !query 51 schema +-- !query 45 schema struct --- !query 51 output +-- !query 45 output 3.13 2.19 --- !query 52 +-- !query 46 select positive('-1.11'), positive(-1.11), negative('-1.11'), negative(-1.11) --- !query 52 schema +-- !query 46 schema struct<(+ CAST(-1.11 AS DOUBLE)):double,(+ -1.11):decimal(3,2),(- CAST(-1.11 AS DOUBLE)):double,(- -1.11):decimal(3,2)> --- !query 52 output +-- !query 46 output -1.11 -1.11 1.11 1.11 --- !query 53 +-- !query 47 select pmod(-7, 2), pmod(0, 2), pmod(7, 0), pmod(7, null), pmod(null, 2), pmod(null, null) --- !query 53 schema +-- !query 47 schema struct --- !query 53 output +-- !query 47 output 1 0 NULL NULL NULL NULL --- !query 54 +-- !query 48 select pmod(cast(3.13 as decimal), cast(0 as decimal)), pmod(cast(2 as smallint), cast(0 as smallint)) --- !query 54 schema +-- !query 48 schema struct --- !query 54 output +-- !query 48 output NULL NULL diff --git a/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out index e035505f15d28..69a8e958000db 100644 --- a/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 15 +-- Number of queries: 14 -- !query 0 @@ -67,74 +67,49 @@ struct -- !query 8 -explain extended -select ifnull(id, 'x'), nullif(id, 'x'), nvl(id, 'x'), nvl2(id, 'x', 'y') -from range(2) --- !query 8 schema -struct --- !query 8 output -== Parsed Logical Plan == -'Project [unresolvedalias('ifnull('id, x), None), unresolvedalias('nullif('id, x), None), unresolvedalias('nvl('id, x), None), unresolvedalias('nvl2('id, x, y), None)] -+- 'UnresolvedTableValuedFunction range, [2] - -== Analyzed Logical Plan == -ifnull(`id`, 'x'): string, nullif(`id`, 'x'): bigint, nvl(`id`, 'x'): string, nvl2(`id`, 'x', 'y'): string -Project [ifnull(id#xL, x) AS ifnull(`id`, 'x')#x, nullif(id#xL, x) AS nullif(`id`, 'x')#xL, nvl(id#xL, x) AS nvl(`id`, 'x')#x, nvl2(id#xL, x, y) AS nvl2(`id`, 'x', 'y')#x] -+- Range (0, 2, step=1, splits=None) - -== Optimized Logical Plan == -Project [coalesce(cast(id#xL as string), x) AS ifnull(`id`, 'x')#x, id#xL AS nullif(`id`, 'x')#xL, coalesce(cast(id#xL as string), x) AS nvl(`id`, 'x')#x, x AS nvl2(`id`, 'x', 'y')#x] -+- Range (0, 2, step=1, splits=None) - -== Physical Plan == -*Project [coalesce(cast(id#xL as string), x) AS ifnull(`id`, 'x')#x, id#xL AS nullif(`id`, 'x')#xL, coalesce(cast(id#xL as string), x) AS nvl(`id`, 'x')#x, x AS nvl2(`id`, 'x', 'y')#x] -+- *Range (0, 2, step=1, splits=2) - - --- !query 9 SELECT boolean(1), tinyint(1), smallint(1), int(1), bigint(1) --- !query 9 schema +-- !query 8 schema struct --- !query 9 output +-- !query 8 output true 1 1 1 1 --- !query 10 +-- !query 9 SELECT float(1), double(1), decimal(1) --- !query 10 schema +-- !query 9 schema struct --- !query 10 output +-- !query 9 output 1.0 1.0 1 --- !query 11 +-- !query 10 SELECT date("2014-04-04"), timestamp(date("2014-04-04")) --- !query 11 schema +-- !query 10 schema struct --- !query 11 output +-- !query 10 output 2014-04-04 2014-04-04 00:00:00 --- !query 12 +-- !query 11 SELECT string(1, 2) --- !query 12 schema +-- !query 11 schema struct<> --- !query 12 output +-- !query 11 output org.apache.spark.sql.AnalysisException Function string accepts only one argument; line 1 pos 7 --- !query 13 +-- !query 12 CREATE TEMPORARY VIEW tempView1 AS VALUES (1, NAMED_STRUCT('col1', 'gamma', 'col2', 'delta')) AS T(id, st) --- !query 13 schema +-- !query 12 schema struct<> --- !query 13 output +-- !query 12 output --- !query 14 +-- !query 13 SELECT nvl(st.col1, "value"), count(*) FROM from tempView1 GROUP BY nvl(st.col1, "value") --- !query 14 schema +-- !query 13 schema struct --- !query 14 output +-- !query 13 output gamma 1 diff --git a/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out index e8f2e0a81455a..25d93b2063146 100644 --- a/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 17 +-- Number of queries: 13 -- !query 0 @@ -29,151 +29,80 @@ abc -- !query 3 -EXPLAIN EXTENDED SELECT (col1 || col2 || col3 || col4) col -FROM (SELECT id col1, id col2, id col3, id col4 FROM range(10)) --- !query 3 schema -struct --- !query 3 output -== Parsed Logical Plan == -'Project [concat(concat(concat('col1, 'col2), 'col3), 'col4) AS col#x] -+- 'SubqueryAlias `__auto_generated_subquery_name` - +- 'Project ['id AS col1#x, 'id AS col2#x, 'id AS col3#x, 'id AS col4#x] - +- 'UnresolvedTableValuedFunction range, [10] - -== Analyzed Logical Plan == -col: string -Project [concat(concat(concat(cast(col1#xL as string), cast(col2#xL as string)), cast(col3#xL as string)), cast(col4#xL as string)) AS col#x] -+- SubqueryAlias `__auto_generated_subquery_name` - +- Project [id#xL AS col1#xL, id#xL AS col2#xL, id#xL AS col3#xL, id#xL AS col4#xL] - +- Range (0, 10, step=1, splits=None) - -== Optimized Logical Plan == -Project [concat(cast(id#xL as string), cast(id#xL as string), cast(id#xL as string), cast(id#xL as string)) AS col#x] -+- Range (0, 10, step=1, splits=None) - -== Physical Plan == -*Project [concat(cast(id#xL as string), cast(id#xL as string), cast(id#xL as string), cast(id#xL as string)) AS col#x] -+- *Range (0, 10, step=1, splits=2) - - --- !query 4 select replace('abc', 'b', '123') --- !query 4 schema +-- !query 3 schema struct --- !query 4 output +-- !query 3 output a123c --- !query 5 +-- !query 4 select replace('abc', 'b') --- !query 5 schema +-- !query 4 schema struct --- !query 5 output +-- !query 4 output ac --- !query 6 +-- !query 5 select length(uuid()), (uuid() <> uuid()) --- !query 6 schema +-- !query 5 schema struct --- !query 6 output +-- !query 5 output 36 true --- !query 7 +-- !query 6 select position('bar' in 'foobarbar'), position(null, 'foobarbar'), position('aaads', null) --- !query 7 schema +-- !query 6 schema struct --- !query 7 output +-- !query 6 output 4 NULL NULL --- !query 8 +-- !query 7 select left("abcd", 2), left("abcd", 5), left("abcd", '2'), left("abcd", null) --- !query 8 schema +-- !query 7 schema struct --- !query 8 output +-- !query 7 output ab abcd ab NULL --- !query 9 +-- !query 8 select left(null, -2), left("abcd", -2), left("abcd", 0), left("abcd", 'a') --- !query 9 schema +-- !query 8 schema struct --- !query 9 output +-- !query 8 output NULL NULL --- !query 10 +-- !query 9 select right("abcd", 2), right("abcd", 5), right("abcd", '2'), right("abcd", null) --- !query 10 schema +-- !query 9 schema struct --- !query 10 output +-- !query 9 output cd abcd cd NULL --- !query 11 +-- !query 10 select right(null, -2), right("abcd", -2), right("abcd", 0), right("abcd", 'a') --- !query 11 schema +-- !query 10 schema struct --- !query 11 output +-- !query 10 output NULL NULL --- !query 12 -set spark.sql.function.concatBinaryAsString=false --- !query 12 schema -struct --- !query 12 output -spark.sql.function.concatBinaryAsString false - - --- !query 13 -EXPLAIN SELECT ((col1 || col2) || (col3 || col4)) col -FROM ( - SELECT - string(id) col1, - string(id + 1) col2, - encode(string(id + 2), 'utf-8') col3, - encode(string(id + 3), 'utf-8') col4 - FROM range(10) -) --- !query 13 schema -struct --- !query 13 output -== Physical Plan == -*Project [concat(cast(id#xL as string), cast((id#xL + 1) as string), cast(encode(cast((id#xL + 2) as string), utf-8) as string), cast(encode(cast((id#xL + 3) as string), utf-8) as string)) AS col#x] -+- *Range (0, 10, step=1, splits=2) - - --- !query 14 -EXPLAIN SELECT (col1 || (col3 || col4)) col -FROM ( - SELECT - string(id) col1, - encode(string(id + 2), 'utf-8') col3, - encode(string(id + 3), 'utf-8') col4 - FROM range(10) -) --- !query 14 schema -struct --- !query 14 output -== Physical Plan == -*Project [concat(cast(id#xL as string), cast(encode(cast((id#xL + 2) as string), utf-8) as string), cast(encode(cast((id#xL + 3) as string), utf-8) as string)) AS col#x] -+- *Range (0, 10, step=1, splits=2) - - --- !query 15 +-- !query 11 SELECT split('aa1cc2ee3', '[1-9]+') --- !query 15 schema +-- !query 11 schema struct> --- !query 15 output +-- !query 11 output ["aa","cc","ee",""] --- !query 16 +-- !query 12 SELECT split('aa1cc2ee3', '[1-9]+', 2) --- !query 16 schema +-- !query 12 schema struct> --- !query 16 output +-- !query 12 output ["aa","cc2ee3"] diff --git a/sql/core/src/test/resources/sql-tests/results/table-valued-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/table-valued-functions.sql.out index 94af9181225d6..fdbea0ee90720 100644 --- a/sql/core/src/test/resources/sql-tests/results/table-valued-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/table-valued-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 10 +-- Number of queries: 8 -- !query 0 @@ -99,42 +99,3 @@ struct -- !query 7 output 0 1 - - --- !query 8 -EXPLAIN select * from RaNgE(2) --- !query 8 schema -struct --- !query 8 output -== Physical Plan == -*Range (0, 2, step=1, splits=2) - - --- !query 9 -EXPLAIN EXTENDED SELECT * FROM range(3) CROSS JOIN range(3) --- !query 9 schema -struct --- !query 9 output -== Parsed Logical Plan == -'Project [*] -+- 'Join Cross - :- 'UnresolvedTableValuedFunction range, [3] - +- 'UnresolvedTableValuedFunction range, [3] - -== Analyzed Logical Plan == -id: bigint, id: bigint -Project [id#xL, id#xL] -+- Join Cross - :- Range (0, 3, step=1, splits=None) - +- Range (0, 3, step=1, splits=None) - -== Optimized Logical Plan == -Join Cross -:- Range (0, 3, step=1, splits=None) -+- Range (0, 3, step=1, splits=None) - -== Physical Plan == -BroadcastNestedLoopJoin BuildRight, Cross -:- *Range (0, 3, step=1, splits=2) -+- BroadcastExchange IdentityBroadcastMode - +- *Range (0, 3, step=1, splits=2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 56d300e30a58e..ce475922eb5e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.StructType @@ -29,10 +30,11 @@ class ExplainSuite extends QueryTest with SharedSQLContext { private def checkKeywordsExistsInExplain(df: DataFrame, keywords: String*): Unit = { val output = new java.io.ByteArrayOutputStream() Console.withOut(output) { - df.explain(extended = false) + df.explain(extended = true) } + val normalizedOutput = output.toString.replaceAll("#\\d+", "#x") for (key <- keywords) { - assert(output.toString.contains(key)) + assert(normalizedOutput.contains(key)) } } @@ -53,6 +55,133 @@ class ExplainSuite extends QueryTest with SharedSQLContext { checkKeywordsExistsInExplain(df, keywords = "InMemoryRelation", "StorageLevel(disk, memory, deserialized, 1 replicas)") } + + test("optimized plan should show the rewritten aggregate expression") { + withTempView("test_agg") { + sql( + """ + |CREATE TEMPORARY VIEW test_agg AS SELECT * FROM VALUES + | (1, true), (1, false), + | (2, true), + | (3, false), (3, null), + | (4, null), (4, null), + | (5, null), (5, true), (5, false) AS test_agg(k, v) + """.stripMargin) + + // simple explain of queries having every/some/any aggregates. Optimized + // plan should show the rewritten aggregate expression. + val df = sql("SELECT k, every(v), some(v), any(v) FROM test_agg GROUP BY k") + checkKeywordsExistsInExplain(df, + "Aggregate [k#x], [k#x, min(v#x) AS every(v)#x, max(v#x) AS some(v)#x, " + + "max(v#x) AS any(v)#x]") + } + } + + test("explain inline tables cross-joins") { + val df = sql( + """ + |SELECT * FROM VALUES ('one', 1), ('three', null) + | CROSS JOIN VALUES ('one', 1), ('three', null) + """.stripMargin) + checkKeywordsExistsInExplain(df, + "Join Cross", + ":- LocalRelation [col1#x, col2#x]", + "+- LocalRelation [col1#x, col2#x]") + } + + test("explain table valued functions") { + checkKeywordsExistsInExplain(sql("select * from RaNgE(2)"), "Range (0, 2, step=1, splits=None)") + checkKeywordsExistsInExplain(sql("SELECT * FROM range(3) CROSS JOIN range(3)"), + "Join Cross", + ":- Range (0, 3, step=1, splits=None)", + "+- Range (0, 3, step=1, splits=None)") + } + + test("explain string functions") { + // Check if catalyst combine nested `Concat`s + val df1 = sql( + """ + |SELECT (col1 || col2 || col3 || col4) col + | FROM (SELECT id col1, id col2, id col3, id col4 FROM range(10)) + """.stripMargin) + checkKeywordsExistsInExplain(df1, + "Project [concat(cast(id#xL as string), cast(id#xL as string), cast(id#xL as string)" + + ", cast(id#xL as string)) AS col#x]") + + // Check if catalyst combine nested `Concat`s if concatBinaryAsString=false + withSQLConf(SQLConf.CONCAT_BINARY_AS_STRING.key -> "false") { + val df2 = sql( + """ + |SELECT ((col1 || col2) || (col3 || col4)) col + |FROM ( + | SELECT + | string(id) col1, + | string(id + 1) col2, + | encode(string(id + 2), 'utf-8') col3, + | encode(string(id + 3), 'utf-8') col4 + | FROM range(10) + |) + """.stripMargin) + checkKeywordsExistsInExplain(df2, + "Project [concat(cast(id#xL as string), cast((id#xL + 1) as string), " + + "cast(encode(cast((id#xL + 2) as string), utf-8) as string), " + + "cast(encode(cast((id#xL + 3) as string), utf-8) as string)) AS col#x]") + + val df3 = sql( + """ + |SELECT (col1 || (col3 || col4)) col + |FROM ( + | SELECT + | string(id) col1, + | encode(string(id + 2), 'utf-8') col3, + | encode(string(id + 3), 'utf-8') col4 + | FROM range(10) + |) + """.stripMargin) + checkKeywordsExistsInExplain(df3, + "Project [concat(cast(id#xL as string), " + + "cast(encode(cast((id#xL + 2) as string), utf-8) as string), " + + "cast(encode(cast((id#xL + 3) as string), utf-8) as string)) AS col#x]") + } + } + + test("check operator precedence") { + // We follow Oracle operator precedence in the table below that lists the levels + // of precedence among SQL operators from high to low: + // --------------------------------------------------------------------------------------- + // Operator Operation + // --------------------------------------------------------------------------------------- + // +, - identity, negation + // *, / multiplication, division + // +, -, || addition, subtraction, concatenation + // =, !=, <, >, <=, >=, IS NULL, LIKE, BETWEEN, IN comparison + // NOT exponentiation, logical negation + // AND conjunction + // OR disjunction + // --------------------------------------------------------------------------------------- + checkKeywordsExistsInExplain(sql("select 'a' || 1 + 2"), + "Project [null AS (CAST(concat(a, CAST(1 AS STRING)) AS DOUBLE) + CAST(2 AS DOUBLE))#x]") + checkKeywordsExistsInExplain(sql("select 1 - 2 || 'b'"), + "Project [-1b AS concat(CAST((1 - 2) AS STRING), b)#x]") + checkKeywordsExistsInExplain(sql("select 2 * 4 + 3 || 'b'"), + "Project [11b AS concat(CAST(((2 * 4) + 3) AS STRING), b)#x]") + checkKeywordsExistsInExplain(sql("select 3 + 1 || 'a' || 4 / 2"), + "Project [4a2.0 AS concat(concat(CAST((3 + 1) AS STRING), a), " + + "CAST((CAST(4 AS DOUBLE) / CAST(2 AS DOUBLE)) AS STRING))#x]") + checkKeywordsExistsInExplain(sql("select 1 == 1 OR 'a' || 'b' == 'ab'"), + "Project [true AS ((1 = 1) OR (concat(a, b) = ab))#x]") + checkKeywordsExistsInExplain(sql("select 'a' || 'c' == 'ac' AND 2 == 3"), + "Project [false AS ((concat(a, c) = ac) AND (2 = 3))#x]") + } + + test("explain for these functions; use range to avoid constant folding") { + val df = sql("select ifnull(id, 'x'), nullif(id, 'x'), nvl(id, 'x'), nvl2(id, 'x', 'y') " + + "from range(2)") + checkKeywordsExistsInExplain(df, + "Project [coalesce(cast(id#xL as string), x) AS ifnull(`id`, 'x')#x, " + + "id#xL AS nullif(`id`, 'x')#xL, coalesce(cast(id#xL as string), x) AS nvl(`id`, 'x')#x, " + + "x AS nvl2(`id`, 'x', 'y')#x]") + } } case class ExplainSingleData(id: Int) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index cf4585bf7ac6c..b2515226d9a14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -137,28 +137,39 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { } } + // For better test coverage, runs the tests on mixed config sets: WHOLESTAGE_CODEGEN_ENABLED + // and CODEGEN_FACTORY_MODE. + private lazy val codegenConfigSets = Array( + ("true", "CODEGEN_ONLY"), + ("false", "CODEGEN_ONLY"), + ("false", "NO_CODEGEN") + ).map { case (wholeStageCodegenEnabled, codegenFactoryMode) => + Array(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStageCodegenEnabled, + SQLConf.CODEGEN_FACTORY_MODE.key -> codegenFactoryMode) + } + /** Run a test case. */ private def runTest(testCase: TestCase): Unit = { val input = fileToString(new File(testCase.inputFile)) val (comments, code) = input.split("\n").partition(_.startsWith("--")) - // Runs all the tests on both codegen-only and interpreter modes - val codegenConfigSets = Array(CODEGEN_ONLY, NO_CODEGEN).map { - case codegenFactoryMode => - Array(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenFactoryMode.toString) - } - val configSets = { - val configLines = comments.filter(_.startsWith("--SET")).map(_.substring(5)) - val configs = configLines.map(_.split(",").map { confAndValue => - val (conf, value) = confAndValue.span(_ != '=') - conf.trim -> value.substring(1).trim - }) - // When we are regenerating the golden files, we don't need to set any config as they - // all need to return the same result - if (regenerateGoldenFiles) { - Array.empty[Array[(String, String)]] - } else { + // List of SQL queries to run + // note: this is not a robust way to split queries using semicolon, but works for now. + val queries = code.mkString("\n").split("(?<=[^\\\\]);").map(_.trim).filter(_ != "").toSeq + + // When we are regenerating the golden files, we don't need to set any config as they + // all need to return the same result + if (regenerateGoldenFiles) { + runQueries(queries, testCase.resultFile, None) + } else { + val configSets = { + val configLines = comments.filter(_.startsWith("--SET")).map(_.substring(5)) + val configs = configLines.map(_.split(",").map { confAndValue => + val (conf, value) = confAndValue.span(_ != '=') + conf.trim -> value.substring(1).trim + }) + if (configs.nonEmpty) { codegenConfigSets.flatMap { codegenConfig => configs.map { config => @@ -169,15 +180,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { codegenConfigSets } } - } - // List of SQL queries to run - // note: this is not a robust way to split queries using semicolon, but works for now. - val queries = code.mkString("\n").split("(?<=[^\\\\]);").map(_.trim).filter(_ != "").toSeq - - if (configSets.isEmpty) { - runQueries(queries, testCase.resultFile, None) - } else { configSets.foreach { configSet => try { runQueries(queries, testCase.resultFile, Some(configSet)) From 5ad03607d1487e7ab3e3b6d00eef9c4028ed4975 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 20 Dec 2018 10:47:24 +0800 Subject: [PATCH 04/11] [SPARK-25271][SQL] Hive ctas commands should use data source if it is convertible ## What changes were proposed in this pull request? In Spark 2.3.0 and previous versions, Hive CTAS command will convert to use data source to write data into the table when the table is convertible. This behavior is controlled by the configs like HiveUtils.CONVERT_METASTORE_ORC and HiveUtils.CONVERT_METASTORE_PARQUET. In 2.3.1, we drop this optimization by mistake in the PR [SPARK-22977](https://github.com/apache/spark/pull/20521/files#r217254430). Since that Hive CTAS command only uses Hive Serde to write data. This patch adds this optimization back to Hive CTAS command. This patch adds OptimizedCreateHiveTableAsSelectCommand which uses data source to write data. ## How was this patch tested? Added test. Closes #22514 from viirya/SPARK-25271-2. Authored-by: Liang-Chi Hsieh Signed-off-by: Wenchen Fan --- .../spark/sql/execution/command/ddl.scala | 8 ++ .../datasources/DataSourceStrategy.scala | 12 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 43 +++++- .../spark/sql/hive/HiveStrategies.scala | 62 +++----- .../org/apache/spark/sql/hive/HiveUtils.scala | 8 ++ .../CreateHiveTableAsSelectCommand.scala | 134 +++++++++++++----- .../spark/sql/hive/HiveParquetSuite.scala | 14 ++ .../sql/hive/execution/SQLQuerySuite.scala | 40 ++++++ 8 files changed, 230 insertions(+), 91 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index e1faecedd20ed..096481f68275d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -820,6 +820,14 @@ object DDLUtils { table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER } + def readHiveTable(table: CatalogTable): HiveTableRelation = { + HiveTableRelation( + table, + // Hive table columns are always nullable. + table.dataSchema.asNullable.toAttributes, + table.partitionSchema.asNullable.toAttributes) + } + /** * Throws a standard error for actions that require partitionProvider = hive. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index b304e2da6e1cf..b5cf8c9515bfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -244,27 +244,19 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] }) } - private def readHiveTable(table: CatalogTable): LogicalPlan = { - HiveTableRelation( - table, - // Hive table columns are always nullable. - table.dataSchema.asNullable.toAttributes, - table.partitionSchema.asNullable.toAttributes) - } - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => i.copy(table = readDataSourceTable(tableMeta)) case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) => - i.copy(table = readHiveTable(tableMeta)) + i.copy(table = DDLUtils.readHiveTable(tableMeta)) case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) => readDataSourceTable(tableMeta) case UnresolvedCatalogRelation(tableMeta) => - readHiveTable(tableMeta) + DDLUtils.readHiveTable(tableMeta) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 5823548a8063c..03f4b8d83e353 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import java.util.Locale + import scala.util.control.NonFatal import com.google.common.util.concurrent.Striped @@ -29,6 +31,8 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._ import org.apache.spark.sql.types._ @@ -113,7 +117,44 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } - def convertToLogicalRelation( + // Return true for Apache ORC and Hive ORC-related configuration names. + // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. + private def isOrcProperty(key: String) = + key.startsWith("orc.") || key.contains(".orc.") + + private def isParquetProperty(key: String) = + key.startsWith("parquet.") || key.contains(".parquet.") + + def convert(relation: HiveTableRelation): LogicalRelation = { + val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) + + // Consider table and storage properties. For properties existing in both sides, storage + // properties will supersede table properties. + if (serde.contains("parquet")) { + val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ + relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> + SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) + convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") + } else { + val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ + relation.tableMeta.storage.properties + if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { + convertToLogicalRelation( + relation, + options, + classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], + "orc") + } else { + convertToLogicalRelation( + relation, + options, + classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], + "orc") + } + } + } + + private def convertToLogicalRelation( relation: HiveTableRelation, options: Map[String, String], fileFormatClass: Class[_ <: FileFormat], diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 07ee105404311..8a5ab188a949f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -31,8 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} -import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} +import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -181,49 +180,17 @@ case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { private def isConvertible(relation: HiveTableRelation): Boolean = { - val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || - serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) + isConvertible(relation.tableMeta) } - // Return true for Apache ORC and Hive ORC-related configuration names. - // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. - private def isOrcProperty(key: String) = - key.startsWith("orc.") || key.contains(".orc.") - - private def isParquetProperty(key: String) = - key.startsWith("parquet.") || key.contains(".parquet.") - - private def convert(relation: HiveTableRelation): LogicalRelation = { - val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - - // Consider table and storage properties. For properties existing in both sides, storage - // properties will supersede table properties. - if (serde.contains("parquet")) { - val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ - relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) - sessionCatalog.metastoreCatalog - .convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") - } else { - val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ - relation.tableMeta.storage.properties - if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { - sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], - "orc") - } else { - sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], - "orc") - } - } + private def isConvertible(tableMeta: CatalogTable): Boolean = { + val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) + serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || + serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC) } + private val metastoreCatalog = sessionCatalog.metastoreCatalog + override def apply(plan: LogicalPlan): LogicalPlan = { plan resolveOperators { // Write path @@ -231,12 +198,21 @@ case class RelationConversions( // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && !r.isPartitioned && isConvertible(r) => - InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) + InsertIntoTable(metastoreCatalog.convert(r), partition, + query, overwrite, ifPartitionNotExists) // Read path case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => - convert(relation) + metastoreCatalog.convert(relation) + + // CTAS + case CreateTable(tableDesc, mode, Some(query)) + if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && + isConvertible(tableDesc) && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_CTAS) => + DDLUtils.checkDataColNames(tableDesc) + OptimizedCreateHiveTableAsSelectCommand( + tableDesc, query, query.output.map(_.name), mode) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 66067704195dd..b60d4c71f5941 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -110,6 +110,14 @@ private[spark] object HiveUtils extends Logging { .booleanConf .createWithDefault(true) + val CONVERT_METASTORE_CTAS = buildConf("spark.sql.hive.convertMetastoreCtas") + .doc("When set to true, Spark will try to use built-in data source writer " + + "instead of Hive serde in CTAS. This flag is effective only if " + + "`spark.sql.hive.convertMetastoreParquet` or `spark.sql.hive.convertMetastoreOrc` is " + + "enabled respectively for Parquet and ORC formats") + .booleanConf + .createWithDefault(true) + val HIVE_METASTORE_SHARED_PREFIXES = buildConf("spark.sql.hive.metastore.sharedPrefixes") .doc("A comma separated list of class prefixes that should be loaded using the classloader " + "that is shared between Spark SQL and a specific version of Hive. An example of classes " + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index fd1e931ee0c7a..608f21e726259 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -20,32 +20,26 @@ package org.apache.spark.sql.hive.execution import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} -import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.command.DataWritingCommand +import org.apache.spark.sql.execution.command.{DataWritingCommand, DDLUtils} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation} +import org.apache.spark.sql.hive.HiveSessionCatalog +trait CreateHiveTableAsSelectBase extends DataWritingCommand { + val tableDesc: CatalogTable + val query: LogicalPlan + val outputColumnNames: Seq[String] + val mode: SaveMode -/** - * Create table and insert the query result into it. - * - * @param tableDesc the Table Describe, which may contain serde, storage handler etc. - * @param query the query whose result will be insert into the new relation - * @param mode SaveMode - */ -case class CreateHiveTableAsSelectCommand( - tableDesc: CatalogTable, - query: LogicalPlan, - outputColumnNames: Seq[String], - mode: SaveMode) - extends DataWritingCommand { - - private val tableIdentifier = tableDesc.identifier + protected val tableIdentifier = tableDesc.identifier override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - if (catalog.tableExists(tableIdentifier)) { + val tableExists = catalog.tableExists(tableIdentifier) + + if (tableExists) { assert(mode != SaveMode.Overwrite, s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite") @@ -57,15 +51,8 @@ case class CreateHiveTableAsSelectCommand( return Seq.empty } - // For CTAS, there is no static partition values to insert. - val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap - InsertIntoHiveTable( - tableDesc, - partition, - query, - overwrite = false, - ifPartitionNotExists = false, - outputColumnNames = outputColumnNames).run(sparkSession, child) + val command = getWritingCommand(catalog, tableDesc, tableExists = true) + command.run(sparkSession, child) } else { // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data @@ -77,15 +64,8 @@ case class CreateHiveTableAsSelectCommand( try { // Read back the metadata of the table which was created just now. val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier) - // For CTAS, there is no static partition values to insert. - val partition = createdTableMeta.partitionColumnNames.map(_ -> None).toMap - InsertIntoHiveTable( - createdTableMeta, - partition, - query, - overwrite = true, - ifPartitionNotExists = false, - outputColumnNames = outputColumnNames).run(sparkSession, child) + val command = getWritingCommand(catalog, createdTableMeta, tableExists = false) + command.run(sparkSession, child) } catch { case NonFatal(e) => // drop the created table. @@ -97,9 +77,89 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` which actually writes data into the table. + def getWritingCommand( + catalog: SessionCatalog, + tableDesc: CatalogTable, + tableExists: Boolean): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( + tableDesc: CatalogTable, + query: LogicalPlan, + outputColumnNames: Seq[String], + mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def getWritingCommand( + catalog: SessionCatalog, + tableDesc: CatalogTable, + tableExists: Boolean): DataWritingCommand = { + // For CTAS, there is no static partition values to insert. + val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap + InsertIntoHiveTable( + tableDesc, + partition, + query, + overwrite = if (tableExists) false else true, + ifPartitionNotExists = false, + outputColumnNames = outputColumnNames) + } +} + +/** + * Create table and insert the query result into it. This creates Hive table but inserts + * the query result into it by using data source. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class OptimizedCreateHiveTableAsSelectCommand( + tableDesc: CatalogTable, + query: LogicalPlan, + outputColumnNames: Seq[String], + mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def getWritingCommand( + catalog: SessionCatalog, + tableDesc: CatalogTable, + tableExists: Boolean): DataWritingCommand = { + val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog + val hiveTable = DDLUtils.readHiveTable(tableDesc) + + val hadoopRelation = metastoreCatalog.convert(hiveTable) match { + case LogicalRelation(t: HadoopFsRelation, _, _, _) => t + case _ => throw new AnalysisException(s"$tableIdentifier should be converted to " + + "HadoopFsRelation.") + } + + InsertIntoHadoopFsRelationCommand( + hadoopRelation.location.rootPaths.head, + Map.empty, // We don't support to convert partitioned table. + false, + Seq.empty, // We don't support to convert partitioned table. + hadoopRelation.bucketSpec, + hadoopRelation.fileFormat, + hadoopRelation.options, + query, + if (tableExists) mode else SaveMode.Overwrite, + Some(tableDesc), + Some(hadoopRelation.location), + query.output.map(_.name)) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala index e5c9df05d5674..470c6a342b4dd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala @@ -92,4 +92,18 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton } } } + + test("SPARK-25271: write empty map into hive parquet table") { + import testImplicits._ + + Seq(Map(1 -> "a"), Map.empty[Int, String]).toDF("m").createOrReplaceTempView("p") + withTempView("p") { + val targetTable = "targetTable" + withTable(targetTable) { + sql(s"CREATE TABLE $targetTable STORED AS PARQUET AS SELECT m FROM p") + checkAnswer(sql(s"SELECT m FROM $targetTable"), + Row(Map(1 -> "a")) :: Row(Map.empty[Int, String]) :: Nil) + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index fab2a27cdef17..6acf44606cbbe 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2276,6 +2276,46 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("SPARK-25271: Hive ctas commands should use data source if it is convertible") { + withTempView("p") { + Seq(1, 2, 3).toDF("id").createOrReplaceTempView("p") + + Seq("orc", "parquet").foreach { format => + Seq(true, false).foreach { isConverted => + withSQLConf( + HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted", + HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") { + Seq(true, false).foreach { isConvertedCtas => + withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> s"$isConvertedCtas") { + + val targetTable = "targetTable" + withTable(targetTable) { + val df = sql(s"CREATE TABLE $targetTable STORED AS $format AS SELECT id FROM p") + checkAnswer(sql(s"SELECT id FROM $targetTable"), + Row(1) :: Row(2) :: Row(3) :: Nil) + + val ctasDSCommand = df.queryExecution.analyzed.collect { + case _: OptimizedCreateHiveTableAsSelectCommand => true + }.headOption + val ctasCommand = df.queryExecution.analyzed.collect { + case _: CreateHiveTableAsSelectCommand => true + }.headOption + + if (isConverted && isConvertedCtas) { + assert(ctasDSCommand.nonEmpty) + assert(ctasCommand.isEmpty) + } else { + assert(ctasDSCommand.isEmpty) + assert(ctasCommand.nonEmpty) + } + } + } + } + } + } + } + } + } test("SPARK-26181 hasMinMaxStats method of ColumnStatsMap is not correct") { withSQLConf(SQLConf.CBO_ENABLED.key -> "true") { From 04d8e3a33c6bb08b2891ca52613cd5ccd24a69dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E4=BA=AE?= Date: Thu, 20 Dec 2018 13:22:12 +0800 Subject: [PATCH 05/11] [SPARK-26318][SQL] Deprecate Row.merge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? Deprecate Row.merge ## How was this patch tested? N/A Closes #23271 from KyleLi1985/master. Authored-by: 李亮 Signed-off-by: Hyukjin Kwon --- sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala index e12bf9616e2de..4f5af9ac80b10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala @@ -57,6 +57,7 @@ object Row { /** * Merge multiple rows into a single row, one after another. */ + @deprecated("This method is deprecated and will be removed in future versions.", "3.0.0") def merge(rows: Row*): Row = { // TODO: Improve the performance of this if used in performance critical part. new GenericRow(rows.flatMap(_.toSeq).toArray) From 98c0ca78610ccf62784081353584717c62285485 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 20 Dec 2018 14:17:44 +0800 Subject: [PATCH 06/11] [SPARK-26308][SQL] Avoid cast of decimals for ScalaUDF ## What changes were proposed in this pull request? Currently, when we infer the schema for scala/java decimals, we return as data type the `SYSTEM_DEFAULT` implementation, ie. the decimal type with precision 38 and scale 18. But this is not right, as we know nothing about the right precision and scale and these values can be not enough to store the data. This problem arises in particular with UDF, where we cast all the input of type `DecimalType` to a `DecimalType(38, 18)`: in case this is not enough, null is returned as input for the UDF. The PR defines a custom handling for casting to the expected data types for ScalaUDF: the decimal precision and scale is picked from the input, so no casting to different and maybe wrong percision and scale happens. ## How was this patch tested? added UTs Closes #23308 from mgaido91/SPARK-26308. Authored-by: Marco Gaido Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/TypeCoercion.scala | 31 ++++++++++++++++++ .../sql/catalyst/expressions/ScalaUDF.scala | 2 +- .../scala/org/apache/spark/sql/UDFSuite.scala | 32 ++++++++++++++++++- 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 133fa119b7aa6..1706b3eece6d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -879,6 +879,37 @@ object TypeCoercion { } } e.withNewChildren(children) + + case udf: ScalaUDF if udf.inputTypes.nonEmpty => + val children = udf.children.zip(udf.inputTypes).map { case (in, expected) => + implicitCast(in, udfInputToCastType(in.dataType, expected)).getOrElse(in) + } + udf.withNewChildren(children) + } + + private def udfInputToCastType(input: DataType, expectedType: DataType): DataType = { + (input, expectedType) match { + // SPARK-26308: avoid casting to an arbitrary precision and scale for decimals. Please note + // that precision and scale cannot be inferred properly for a ScalaUDF because, when it is + // created, it is not bound to any column. So here the precision and scale of the input + // column is used. + case (in: DecimalType, _: DecimalType) => in + case (ArrayType(dtIn, _), ArrayType(dtExp, nullableExp)) => + ArrayType(udfInputToCastType(dtIn, dtExp), nullableExp) + case (MapType(keyDtIn, valueDtIn, _), MapType(keyDtExp, valueDtExp, nullableExp)) => + MapType(udfInputToCastType(keyDtIn, keyDtExp), + udfInputToCastType(valueDtIn, valueDtExp), + nullableExp) + case (StructType(fieldsIn), StructType(fieldsExp)) => + val fieldTypes = + fieldsIn.map(_.dataType).zip(fieldsExp.map(_.dataType)).map { case (dtIn, dtExp) => + udfInputToCastType(dtIn, dtExp) + } + StructType(fieldsExp.zip(fieldTypes).map { case (field, newDt) => + field.copy(dataType = newDt) + }) + case (_, other) => other + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala index fae90caebf96c..a23aaa3a0b3ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala @@ -52,7 +52,7 @@ case class ScalaUDF( udfName: Option[String] = None, nullable: Boolean = true, udfDeterministic: Boolean = true) - extends Expression with ImplicitCastInputTypes with NonSQLExpression with UserDefinedExpression { + extends Expression with NonSQLExpression with UserDefinedExpression { // The constructor for SPARK 2.1 and 2.2 def this( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 20dcefa7e3cad..a26d306cff6b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import java.math.BigDecimal + import org.apache.spark.sql.api.java._ import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.QueryExecution @@ -26,7 +28,7 @@ import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationComm import org.apache.spark.sql.functions.{lit, udf} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData._ -import org.apache.spark.sql.types.{DataTypes, DoubleType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.QueryExecutionListener @@ -420,4 +422,32 @@ class UDFSuite extends QueryTest with SharedSQLContext { checkAnswer(df, Seq(Row("null1x"), Row(null), Row("N3null"))) } } + + test("SPARK-26308: udf with decimal") { + val df1 = spark.createDataFrame( + sparkContext.parallelize(Seq(Row(new BigDecimal("2011000000000002456556")))), + StructType(Seq(StructField("col1", DecimalType(30, 0))))) + val udf1 = org.apache.spark.sql.functions.udf((value: BigDecimal) => { + if (value == null) null else value.toBigInteger.toString + }) + checkAnswer(df1.select(udf1(df1.col("col1"))), Seq(Row("2011000000000002456556"))) + } + + test("SPARK-26308: udf with complex types of decimal") { + val df1 = spark.createDataFrame( + sparkContext.parallelize(Seq(Row(Array(new BigDecimal("2011000000000002456556"))))), + StructType(Seq(StructField("col1", ArrayType(DecimalType(30, 0)))))) + val udf1 = org.apache.spark.sql.functions.udf((arr: Seq[BigDecimal]) => { + arr.map(value => if (value == null) null else value.toBigInteger.toString) + }) + checkAnswer(df1.select(udf1($"col1")), Seq(Row(Array("2011000000000002456556")))) + + val df2 = spark.createDataFrame( + sparkContext.parallelize(Seq(Row(Map("a" -> new BigDecimal("2011000000000002456556"))))), + StructType(Seq(StructField("col1", MapType(StringType, DecimalType(30, 0)))))) + val udf2 = org.apache.spark.sql.functions.udf((map: Map[String, BigDecimal]) => { + map.mapValues(value => if (value == null) null else value.toBigInteger.toString) + }) + checkAnswer(df2.select(udf2($"col1")), Seq(Row(Map("a" -> "2011000000000002456556")))) + } } From 7c8f4756c34a0b00931c2987c827a18d989e6c08 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 20 Dec 2018 08:26:25 -0600 Subject: [PATCH 07/11] [SPARK-24687][CORE] Avoid job hanging when generate task binary causes fatal error ## What changes were proposed in this pull request? When NoClassDefFoundError thrown,it will cause job hang. `Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: Lcom/xxx/data/recommend/aggregator/queue/QueueName; at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2436) at java.lang.Class.getDeclaredField(Class.java:1946) at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.(ObjectStreamClass.java:468) at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) at java.io.ObjectOutputStream.writeClass(ObjectOutputStream.java:1212) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1119) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)` It is caused by NoClassDefFoundError will not catch up during task seriazation. `var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). val taskBinaryBytes: Array[Byte] = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } taskBinary = sc.broadcast(taskBinaryBytes) } catch { // In the case of a failure during serialization, abort the stage. case e: NotSerializableException => abortStage(stage, "Task not serializable: " + e.toString, Some(e)) runningStages -= stage // Abort execution return case NonFatal(e) => abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return }` image below shows that stage 33 blocked and never be scheduled. 2018-06-28 4 28 42 2018-06-28 4 28 49 ## How was this patch tested? UT Closes #21664 from caneGuy/zhoukang/fix-noclassdeferror. Authored-by: zhoukang Signed-off-by: Sean Owen --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 06966e77db81e..6f4c326442e1e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1170,9 +1170,11 @@ private[spark] class DAGScheduler( // Abort execution return - case NonFatal(e) => + case e: Throwable => abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage + + // Abort execution return } From a888d202ab719ccb13b328aa445341d1d6b06881 Mon Sep 17 00:00:00 2001 From: Jorge Machado Date: Thu, 20 Dec 2018 08:29:51 -0600 Subject: [PATCH 08/11] [SPARK-26324][DOCS] Add Spark docs for Running in Mesos with SSL ## What changes were proposed in this pull request? Added docs for running spark jobs with Mesos on SSL Closes #23342 from jomach/master. Lead-authored-by: Jorge Machado Co-authored-by: Jorge Machado Co-authored-by: Jorge Machado Co-authored-by: Jorge Machado Signed-off-by: Sean Owen --- docs/running-on-mesos.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 968d668e2c93a..a07773c1c71e1 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -108,6 +108,19 @@ Please note that if you specify multiple ways to obtain the credentials then the An equivalent order applies for the secret. Essentially we prefer the configuration to be specified directly rather than indirectly by files, and we prefer that configuration settings are used over environment variables. +### Deploy to a Mesos running on Secure Sockets + +If you want to deploy a Spark Application into a Mesos cluster that is running in a secure mode there are some environment variables that need to be set. + +- `LIBPROCESS_SSL_ENABLED=true` enables SSL communication +- `LIBPROCESS_SSL_VERIFY_CERT=false` verifies the ssl certificate +- `LIBPROCESS_SSL_KEY_FILE=pathToKeyFile.key` path to key +- `LIBPROCESS_SSL_CERT_FILE=pathToCRTFile.crt` the certificate file to be used + +All options can be found at http://mesos.apache.org/documentation/latest/ssl/ + +Then submit happens as described in Client mode or Cluster mode below + ## Uploading Spark Package When Mesos runs a task on a Mesos slave for the first time, that slave must have a Spark binary From 6692bacf3e74e7a17d8e676e8a06ab198f85d328 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 20 Dec 2018 10:05:56 -0800 Subject: [PATCH 09/11] [SPARK-26409][SQL][TESTS] SQLConf should be serializable in test sessions ## What changes were proposed in this pull request? `SQLConf` is supposed to be serializable. However, currently it is not serializable in `WithTestConf`. `WithTestConf` uses the method `overrideConfs` in closure, while the classes which implements it (`TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder`) are not serializable. This PR is to use a local variable to fix it. ## How was this patch tested? Add unit test. Closes #23352 from gengliangwang/serializableSQLConf. Authored-by: Gengliang Wang Signed-off-by: gatorsmile --- .../apache/spark/sql/internal/BaseSessionStateBuilder.scala | 3 ++- .../test/scala/org/apache/spark/sql/SerializationSuite.scala | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index ac07e1f6bb4f8..319c2649592fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -309,13 +309,14 @@ private[sql] trait WithTestConf { self: BaseSessionStateBuilder => def overrideConfs: Map[String, String] override protected lazy val conf: SQLConf = { + val overrideConfigurations = overrideConfs val conf = parentState.map(_.conf.clone()).getOrElse { new SQLConf { clear() override def clear(): Unit = { super.clear() // Make sure we start with the default test configs even after clear - overrideConfs.foreach { case (key, value) => setConfString(key, value) } + overrideConfigurations.foreach { case (key, value) => setConfString(key, value) } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SerializationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SerializationSuite.scala index cd6b2647e0be6..1a1c956aed3d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SerializationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SerializationSuite.scala @@ -27,4 +27,9 @@ class SerializationSuite extends SparkFunSuite with SharedSQLContext { val spark = SparkSession.builder.getOrCreate() new JavaSerializer(new SparkConf()).newInstance().serialize(spark.sqlContext) } + + test("[SPARK-26409] SQLConf should be serializable") { + val spark = SparkSession.builder.getOrCreate() + new JavaSerializer(new SparkConf()).newInstance().serialize(spark.sessionState.conf) + } } From 3d6b44d9ea92dc1eabb8f211176861e51240bf93 Mon Sep 17 00:00:00 2001 From: Ngone51 Date: Thu, 20 Dec 2018 10:25:52 -0800 Subject: [PATCH 10/11] [SPARK-26392][YARN] Cancel pending allocate requests by taking locality preference into account ## What changes were proposed in this pull request? Right now, we cancel pending allocate requests by its sending order. I thing we can take locality preference into account when do this to perfom least impact on task locality preference. ## How was this patch tested? N.A. Closes #23344 from Ngone51/dev-cancel-pending-allocate-requests-by-taking-locality-preference-into-account. Authored-by: Ngone51 Signed-off-by: Marcelo Vanzin --- .../spark/deploy/yarn/YarnAllocator.scala | 29 ++++++++----------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index d37d0d66d8ae2..54b1ec266113f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -294,6 +294,15 @@ private[yarn] class YarnAllocator( s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " + s"executorsStarting: ${numExecutorsStarting.get}") + // Split the pending container request into three groups: locality matched list, locality + // unmatched list and non-locality list. Take the locality matched container request into + // consideration of container placement, treat as allocated containers. + // For locality unmatched and locality free container requests, cancel these container + // requests, since required locality preference has been changed, recalculating using + // container placement strategy. + val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality( + hostToLocalTaskCounts, pendingAllocate) + if (missing > 0) { if (log.isInfoEnabled()) { var requestContainerMessage = s"Will request $missing executor container(s), each with " + @@ -306,15 +315,6 @@ private[yarn] class YarnAllocator( logInfo(requestContainerMessage) } - // Split the pending container request into three groups: locality matched list, locality - // unmatched list and non-locality list. Take the locality matched container request into - // consideration of container placement, treat as allocated containers. - // For locality unmatched and locality free container requests, cancel these container - // requests, since required locality preference has been changed, recalculating using - // container placement strategy. - val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality( - hostToLocalTaskCounts, pendingAllocate) - // cancel "stale" requests for locations that are no longer needed staleRequests.foreach { stale => amClient.removeContainerRequest(stale) @@ -374,14 +374,9 @@ private[yarn] class YarnAllocator( val numToCancel = math.min(numPendingAllocate, -missing) logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " + s"total $targetNumExecutors executors.") - - val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource) - if (!matchingRequests.isEmpty) { - matchingRequests.iterator().next().asScala - .take(numToCancel).foreach(amClient.removeContainerRequest) - } else { - logWarning("Expected to find pending requests, but found none.") - } + // cancel pending allocate requests by taking locality preference into account + val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel) + cancelRequests.foreach(amClient.removeContainerRequest) } } From aa0d4ca8bab08a467645080a5b8a28bf6dd8a042 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Thu, 20 Dec 2018 11:22:49 -0800 Subject: [PATCH 11/11] [SPARK-25970][ML] Add Instrumentation to PrefixSpan ## What changes were proposed in this pull request? Add Instrumentation to PrefixSpan ## How was this patch tested? existing tests Closes #22971 from zhengruifeng/log_PrefixSpan. Authored-by: zhengruifeng Signed-off-by: Xiangrui Meng --- .../src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala index 2a3413553a6af..b0006a8d4a58e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala @@ -20,6 +20,7 @@ package org.apache.spark.ml.fpm import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.param._ import org.apache.spark.ml.util.Identifiable +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.fpm.{PrefixSpan => mllibPrefixSpan} import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions.col @@ -135,7 +136,10 @@ final class PrefixSpan(@Since("2.4.0") override val uid: String) extends Params * - `freq: Long` */ @Since("2.4.0") - def findFrequentSequentialPatterns(dataset: Dataset[_]): DataFrame = { + def findFrequentSequentialPatterns(dataset: Dataset[_]): DataFrame = instrumented { instr => + instr.logDataset(dataset) + instr.logParams(this, params: _*) + val sequenceColParam = $(sequenceCol) val inputType = dataset.schema(sequenceColParam).dataType require(inputType.isInstanceOf[ArrayType] &&