Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26403][SQL] Support pivoting using array column for pivot(column) API #23349

Closed
wants to merge 5 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Dec 19, 2018

What changes were proposed in this pull request?

This PR fixes pivot(Column) can accepts collection.mutable.WrappedArray.

Note that we return collection.mutable.WrappedArray from ArrayType, and Literal.apply doesn't support this.

We can unwrap the array and use it for type dispatch.

val df = Seq(
  (2, Seq.empty[String]),
  (2, Seq("a", "x")),
  (3, Seq.empty[String]),
  (3, Seq("a", "x"))).toDF("x", "s")
df.groupBy("x").pivot("s").count().show()

Before:

Unsupported literal type class scala.collection.mutable.WrappedArray$ofRef WrappedArray()
java.lang.RuntimeException: Unsupported literal type class scala.collection.mutable.WrappedArray$ofRef WrappedArray()
	at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:80)
	at org.apache.spark.sql.RelationalGroupedDataset.$anonfun$pivot$2(RelationalGroupedDataset.scala:427)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:39)
	at scala.collection.TraversableLike.map(TraversableLike.scala:237)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:425)
	at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:406)
	at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:317)
	at org.apache.spark.sql.DataFramePivotSuite.$anonfun$new$1(DataFramePivotSuite.scala:341)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

After:

+---+---+------+
|  x| []|[a, x]|
+---+---+------+
|  3|  1|     1|
|  2|  1|     1|
+---+---+------+

How was this patch tested?

Manually tested and unittests were added.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Dec 19, 2018

cc @cloud-fan

During checking the codes, I found two things below that I would like to mention.

1.. Literal.create(value) already supports this when input type is explicit.

scala> Literal.create(collection.mutable.WrappedArray.make(Array(1)))
res0: org.apache.spark.sql.catalyst.expressions.Literal = [1]

scala> Literal.create(collection.mutable.WrappedArray.make(Array(1)): Seq[Int])
res1: org.apache.spark.sql.catalyst.expressions.Literal = [1]

However, currently not when we don't know the type:

scala> Literal.create(collection.mutable.WrappedArray.make(Array(1)): Any)
java.lang.RuntimeException: Unsupported literal type class scala.collection.mutable.WrappedArray$ofInt WrappedArray(1)
  at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:79)
  at org.apache.spark.sql.catalyst.expressions.Literal$.$anonfun$create$2(literals.scala:171)
  at scala.util.Failure.getOrElse(Try.scala:222)
  at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:171)
  ... 49 elided

This PR also is going to fix it as below:

scala> Literal.create(collection.mutable.WrappedArray.make(Array(1)): Any)
res0: org.apache.spark.sql.catalyst.expressions.Literal = [1]

2.. Literal.apply(value) can't support Seq[_] for now:

scala> Literal.apply(Seq(1))
java.lang.RuntimeException: Unsupported literal type class scala.collection.immutable.$colon$colon List(1)
  at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:79)
  ... 49 elided

I would like to leave these as a separate discussion out of this PR.

@SparkQA
Copy link

SparkQA commented Dec 19, 2018

Test build #100301 has finished for PR 23349 at commit 741ee69.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 20, 2018

Test build #100317 has finished for PR 23349 at commit 741ee69.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

BTW, if we give the values by pivot(column, values) API, it works as described in JIRA.

@@ -67,6 +67,7 @@ object Literal {
case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType)
case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
case a: Array[Byte] => Literal(a, BinaryType)
case a: collection.mutable.WrappedArray[_] => apply(a.array)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you look at the doc of Row.get, the official type for array type is scala.collection.Seq

In practice, we should never match collection.mutable.WrappedArray explicitly. Users usually don't create this type explicitly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can easily support Seq with minimised change here - let me take another look for this. That's why I targeted to support an array column in pivot specifically. Maybe I can do the type check within pivot before we call Literal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I'm not sure if I know the way for Seq here. I can make the changes inside of pivot to be more conservative, @cloud-fan. Let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it simply apply(seq.toArray)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, that's not complied..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[error] /.../spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala:70: No ClassTag available for _
[error]     case a: Seq[_] => apply(a.toArray)
[error]                               ^
[error] one error found

We can't use type tag here as well since what we need is Any:

def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = {

I know there is some subtlety. That's why I tried to explain in details ..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we provide the class tag manually? like

apply(seq.toArray(classTag[Any]))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That wouldn't work

scala> Array(1,2,3).asInstanceOf[Array[Any]]
java.lang.ClassCastException: [I cannot be cast to [Ljava.lang.Object;
  ... 29 elided

scala> classOf[Array[Any]].getComponentType()
res1: Class[_] = class java.lang.Object

The cases below fail to compile.

apply(a.toArray[classTag[Any]])
apply(a.toArray[scala.reflect.classTag[Any]])

The case below causes runtime failure.

case a: Seq[Any] => apply(a.toArray)
Unsupported component type class java.lang.Object in arrays;
org.apache.spark.sql.AnalysisException: Unsupported component type class java.lang.Object in arrays;
	at org.apache.spark.sql.catalyst.expressions.Literal$.componentTypeToDataType(literals.scala:119)
	at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:72)
	at org.apache.spark.sql.RelationalGroupedDataset.$anonfun$pivot$2(RelationalGroupedDataset.scala:427)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:39)

@HyukjinKwon
Copy link
Member Author

Hey, @cloud-fan, I think this change doesn't make Literal or pivot worse. If you worry about that we support WrappedArray in Literal, I can only make within pivot.

@cloud-fan
Copy link
Contributor

I can only make within pivot.

Can you make a try first?

// ArrayType returns a `WrappedArray` but currently `Literal.apply`
// does not support this type although it supports a normal array.
// Here manually unwrap to make it an array. See also SPARK-26403.
case v: collection.mutable.WrappedArray[_] => Literal.apply(v.array)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, I didn't mean something special but this :).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see. Then I think it's better to put it in Literal.apply, as it can help more cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup.

@SparkQA
Copy link

SparkQA commented Dec 30, 2018

Test build #100567 has finished for PR 23349 at commit 392c87f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 2, 2019

Test build #100631 has finished for PR 23349 at commit d91ade6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@HyukjinKwon
Copy link
Member Author

Haha, looks missed. I merged instead :D. Happy new year Wenchen.

@cloud-fan
Copy link
Contributor

ah sorry the merge script failed with network issue and I didn't notice, thanks for merging!

@asfgit asfgit closed this in 56967b7 Jan 3, 2019
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…umn)` API

## What changes were proposed in this pull request?

This PR fixes `pivot(Column)` can accepts `collection.mutable.WrappedArray`.

Note that we return `collection.mutable.WrappedArray` from `ArrayType`, and `Literal.apply` doesn't support this.

We can unwrap the array and use it for type dispatch.

```scala
val df = Seq(
  (2, Seq.empty[String]),
  (2, Seq("a", "x")),
  (3, Seq.empty[String]),
  (3, Seq("a", "x"))).toDF("x", "s")
df.groupBy("x").pivot("s").count().show()
```

Before:

```
Unsupported literal type class scala.collection.mutable.WrappedArray$ofRef WrappedArray()
java.lang.RuntimeException: Unsupported literal type class scala.collection.mutable.WrappedArray$ofRef WrappedArray()
	at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:80)
	at org.apache.spark.sql.RelationalGroupedDataset.$anonfun$pivot$2(RelationalGroupedDataset.scala:427)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:39)
	at scala.collection.TraversableLike.map(TraversableLike.scala:237)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:425)
	at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:406)
	at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:317)
	at org.apache.spark.sql.DataFramePivotSuite.$anonfun$new$1(DataFramePivotSuite.scala:341)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
```

After:

```
+---+---+------+
|  x| []|[a, x]|
+---+---+------+
|  3|  1|     1|
|  2|  1|     1|
+---+---+------+
```

## How was this patch tested?

Manually tested and unittests were added.

Closes apache#23349 from HyukjinKwon/SPARK-26403.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@HyukjinKwon HyukjinKwon deleted the SPARK-26403 branch March 3, 2020 01:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants