From 741ee69178d3c1df8c85be1f9dab102beba90a27 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 19 Dec 2018 17:17:11 +0800 Subject: [PATCH 1/5] Support pivoting using array column for `pivot(column)` API --- .../spark/sql/catalyst/expressions/literals.scala | 1 + .../catalyst/expressions/LiteralExpressionSuite.scala | 2 ++ .../org/apache/spark/sql/DataFramePivotSuite.scala | 11 +++++++++++ 3 files changed, 14 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index 34d252886ffb0..48beffa18a551 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -67,6 +67,7 @@ object Literal { case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType) case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType) case a: Array[Byte] => Literal(a, BinaryType) + case a: collection.mutable.WrappedArray[_] => apply(a.array) case a: Array[_] => val elementType = componentTypeToDataType(a.getClass.getComponentType()) val dataType = ArrayType(elementType) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala index 3ea6bfac9ddca..133aaa449ea44 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala @@ -179,6 +179,8 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { checkArrayLiteral(Array("a", "b", "c")) checkArrayLiteral(Array(1.0, 4.0)) checkArrayLiteral(Array(CalendarInterval.MICROS_PER_DAY, CalendarInterval.MICROS_PER_HOUR)) + val arr = collection.mutable.WrappedArray.make(Array(1.0, 4.0)) + checkEvaluation(Literal(arr), toCatalyst(arr)) } test("seq") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala index b52ca58c07d27..8c2c11be9b6fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala @@ -333,4 +333,15 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext { } assert(exception.getMessage.contains("Unsupported literal type")) } + + test("SPARK-26403: pivoting by array column") { + val df = Seq( + (2, Seq.empty[String]), + (2, Seq("a", "x")), + (3, Seq.empty[String]), + (3, Seq("a", "x"))).toDF("x", "s") + val expected = Seq((3, 1, 1), (2, 1, 1)).toDF + val actual = df.groupBy("x").pivot("s").count() + checkAnswer(actual, expected) + } } From 825d14c38d31096c97b1e47f2476216f106255e9 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Mon, 31 Dec 2018 00:30:48 +0800 Subject: [PATCH 2/5] Alternative take --- .../apache/spark/sql/catalyst/expressions/literals.scala | 1 - .../org/apache/spark/sql/RelationalGroupedDataset.scala | 8 ++++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index 48beffa18a551..34d252886ffb0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -67,7 +67,6 @@ object Literal { case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType) case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType) case a: Array[Byte] => Literal(a, BinaryType) - case a: collection.mutable.WrappedArray[_] => apply(a.array) case a: Array[_] => val elementType = componentTypeToDataType(a.getClass.getComponentType()) val dataType = ArrayType(elementType) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index e85636d82a62c..b0f5d9863417e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -422,10 +422,14 @@ class RelationalGroupedDataset protected[sql]( def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { groupType match { case RelationalGroupedDataset.GroupByType => - val valueExprs = values.map(_ match { + val valueExprs = values.map { case c: Column => c.expr + // ArrayType returns a `WrappedArray` but currently `Literal.apply` + // does not support this type although it supports a normal array. + // Here manually unwrap to make it an array. See also SPARK-26403. + case v: collection.mutable.WrappedArray[_] => Literal.apply(v.array) case v => Literal.apply(v) - }) + } new RelationalGroupedDataset( df, groupingExprs, From 392c87fab1294b2ea55cfa1e933ba379aa1ee527 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Mon, 31 Dec 2018 00:31:47 +0800 Subject: [PATCH 3/5] Remove literal test --- .../spark/sql/catalyst/expressions/LiteralExpressionSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala index 133aaa449ea44..3ea6bfac9ddca 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala @@ -179,8 +179,6 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { checkArrayLiteral(Array("a", "b", "c")) checkArrayLiteral(Array(1.0, 4.0)) checkArrayLiteral(Array(CalendarInterval.MICROS_PER_DAY, CalendarInterval.MICROS_PER_HOUR)) - val arr = collection.mutable.WrappedArray.make(Array(1.0, 4.0)) - checkEvaluation(Literal(arr), toCatalyst(arr)) } test("seq") { From 7f8c3e989cb004579dae5772ec2ae4430e995139 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 2 Jan 2019 11:33:09 +0800 Subject: [PATCH 4/5] Revert "Alternative take" This reverts commit 825d14c38d31096c97b1e47f2476216f106255e9. --- .../apache/spark/sql/catalyst/expressions/literals.scala | 1 + .../org/apache/spark/sql/RelationalGroupedDataset.scala | 8 ++------ 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index 34d252886ffb0..48beffa18a551 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -67,6 +67,7 @@ object Literal { case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType) case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType) case a: Array[Byte] => Literal(a, BinaryType) + case a: collection.mutable.WrappedArray[_] => apply(a.array) case a: Array[_] => val elementType = componentTypeToDataType(a.getClass.getComponentType()) val dataType = ArrayType(elementType) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index b0f5d9863417e..e85636d82a62c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -422,14 +422,10 @@ class RelationalGroupedDataset protected[sql]( def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { groupType match { case RelationalGroupedDataset.GroupByType => - val valueExprs = values.map { + val valueExprs = values.map(_ match { case c: Column => c.expr - // ArrayType returns a `WrappedArray` but currently `Literal.apply` - // does not support this type although it supports a normal array. - // Here manually unwrap to make it an array. See also SPARK-26403. - case v: collection.mutable.WrappedArray[_] => Literal.apply(v.array) case v => Literal.apply(v) - } + }) new RelationalGroupedDataset( df, groupingExprs, From d91ade60e14dbb7327351de5c59f50ba7d66e26a Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 2 Jan 2019 11:33:21 +0800 Subject: [PATCH 5/5] Revert "Remove literal test" This reverts commit 392c87fab1294b2ea55cfa1e933ba379aa1ee527. --- .../spark/sql/catalyst/expressions/LiteralExpressionSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala index 3ea6bfac9ddca..133aaa449ea44 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala @@ -179,6 +179,8 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { checkArrayLiteral(Array("a", "b", "c")) checkArrayLiteral(Array(1.0, 4.0)) checkArrayLiteral(Array(CalendarInterval.MICROS_PER_DAY, CalendarInterval.MICROS_PER_HOUR)) + val arr = collection.mutable.WrappedArray.make(Array(1.0, 4.0)) + checkEvaluation(Literal(arr), toCatalyst(arr)) } test("seq") {