Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16792][SQL] Dataset containing a Case Class with a List type causes a CompileException (converting sequence to list) #16240

Closed
wants to merge 12 commits into from

Conversation

michalsenkyr
Copy link
Contributor

@michalsenkyr michalsenkyr commented Dec 10, 2016

What changes were proposed in this pull request?

Added a to call at the end of the code generated by ScalaReflection.deserializerFor if the requested type is not a supertype of WrappedArray[_] that uses CanBuildFrom[_, _, _] to convert result into an arbitrary subtype of Seq[_].

Care was taken to preserve the original deserialization where it is possible to avoid the overhead of conversion in cases where it is not needed

ScalaReflection.serializerFor could already be used to serialize any Seq[_] so it was not altered

SQLImplicits had to be altered and new implicit encoders added to permit serialization of other sequence types

Also fixes [SPARK-16815] Dataset[List[T]] leads to ArrayStoreException

How was this patch tested?

./build/mvn -DskipTests clean package && ./dev/run-tests

Also manual execution of the following sets of commands in the Spark shell:

case class TestCC(key: Int, letters: List[String])

val ds1 = sc.makeRDD(Seq(
(List("D")),
(List("S","H")),
(List("F","H")),
(List("D","L","L"))
)).map(x=>(x.length,x)).toDF("key","letters").as[TestCC]

val test1=ds1.map{_.key}
test1.show
case class X(l: List[String])
spark.createDataset(Seq(List("A"))).map(X).show
spark.sqlContext.createDataset(sc.parallelize(List(1) :: Nil)).collect

After adding arbitrary sequence support also tested with the following commands:

case class QueueClass(q: scala.collection.immutable.Queue[Int])

spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect

@michalsenkyr
Copy link
Contributor Author

michalsenkyr commented Dec 10, 2016

(edit - old commit info; doesn't apply anymore) I would like to add that the conversion is specific to List[_]. I can add support for arbitrary sequence types through the use of CanBuildFrom if it is desirable.

We can also support Sets in this way by serializing into arrays. See SPARK-17414

@SparkQA
Copy link

SparkQA commented Dec 10, 2016

Test build #3488 has finished for PR 16240 at commit 8c15b47.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@michalsenkyr
Copy link
Contributor Author

michalsenkyr commented Dec 10, 2016

Added support for arbitrary sequences.

Now also Queues, ArrayBuffers and such can be used in datasets (all are serialized into ArrayType).

I had to alter and add new implicit encoders into SQLImplicits. The new encoders are for Seq with Product combination (essentially only List) to disambiguate between Seq and Product encoders.

However, I encountered another problem with implicits. When constructing a complex Dataset using Seq.toDS that includes a Product (like a case class or a tuple) and a sequence, the encoder doesn't seem to be created. When constructed with spark.createDataset or when transforming an existing dataset, there is no problem.

Example code where the problem manifests:

Seq(Queue(Tuple1(1))).toDS()

I added a workaround by defining a specific implicit just for Seqs. This makes the problem go away for existing usages (with Seq), however other collections cannot be constructed by Seq.toDS unless newProductSeqEncoder[A, T] is created with the correct type parameters.

If anybody knows how to fix this, let me know.

@michalsenkyr
Copy link
Contributor Author

Possible optimization: Instead of conversions using to, we can use Builders. This way we could get rid of the conversion overhead. This would require adding a new codegen method that would operate similarly to MapObjects but use a provided Builder to build the collection directly.

I will wait for a response to this PR before attempting any more modifications.

newLongSeqEncoder

/** @since 2.2.0 */
implicit def newDoubleListEncoder[T <: Seq[Double] with Product : TypeTag]: Encoder[T] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be newDoubleSeqWithProductEncoder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should. Thanks

@marmbrus
Copy link
Contributor

/cc @cloud-fan

@cloud-fan
Copy link
Contributor

The overall strategy LGTM.

I had to alter and add new implicit encoders into SQLImplicits. The new encoders are for Seq with Product combination (essentially only List) to disambiguate between Seq and Product encoders.

Does scala have a clear definition for this case? i.e. we have implicit for both type A and B, given type A with B, which implicit will be picked?

For the optimization, we can do it in follow-up.

@michalsenkyr
Copy link
Contributor Author

None of them. The compilation will fail. That is why I had to provide those additional implicits.

scala> class Test[T]
defined class Test

scala> implicit def test1[T <: Seq[String]]: Test[T] = null
test1: [T <: Seq[String]]=> Test[T]

scala> implicit def test2[T <: Product]: Test[T] = null
test2: [T <: Product]=> Test[T]

scala> def test[T : Test](t: T) = null
test: [T](t: T)(implicit evidence$1: Test[T])Null

scala> test(List("abc"))
<console>:31: error: ambiguous implicit values:
 both method test1 of type [T <: Seq[String]]=> Test[T]
 and method test2 of type [T <: Product]=> Test[T]
 match expected type Test[List[String]]
       test(List("abc"))

@cloud-fan
Copy link
Contributor

How about we assign priority to implicit rules like http://stackoverflow.com/questions/1886953/is-there-a-way-to-control-which-implicit-conversion-will-be-the-default-used ?

I think we should prefer Seq encoder over Product encoder, for Seq with Product

@michalsenkyr
Copy link
Contributor Author

michalsenkyr commented Dec 22, 2016

I actually read that before but IDEA complained when I tried to place the Product encoder into a separate trait. So I opted for specificity.
However, I tried it again right now and even though IDEA still complains, scalac compiles it, all the tests pass and it all works.

val cls = mirror.runtimeClass(t.typeSymbol.asClass)
import scala.collection.generic.CanBuildFrom
import scala.reflect.ClassTag
import scala.util.{Try, Success}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark code style discourage the usage of Try and Success, can you refactor your code a little bit? i.e. move cls.getDeclaredMethod("canBuildFrom", classOf[ClassTag[_]]) out of the Invoke code block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I tried looking up the code style you mentioned, but only found the Databricks' Scala Code Style Guide. And that is not mentioned in the Spark docs as far as I know.


/** @since 1.6.1 */
implicit def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = ExpressionEncoder()
implicit def newProductSeqEncoder[A <: Product : TypeTag, T <: Seq[A] : TypeTag]: Encoder[T] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my only concern now. Can you provide more details about it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is the same as all the other ones, just with Product subclasses. If you were concerned about the TypeTag on A, it was actually not needed as T's tag already contains all the information. I just tested it to be sure and removed it.


/** @since 1.6.1 */
implicit def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = ExpressionEncoder()
implicit def newProductSeqEncoder[A <: Product, T <: Seq[A] : TypeTag]: Encoder[T] =
Copy link
Contributor

@cloud-fan cloud-fan Dec 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just use newProductSeqEncoder[T <: Seq[Product] : TypeTag]: Encoder[T] here? Then we don't need the workaround implicit

Copy link
Contributor Author

@michalsenkyr michalsenkyr Dec 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Seq declares covariance on T so it works and solves all the problems I was having. Thanks

@@ -130,6 +130,30 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext {
checkDataset(Seq(Array(Tuple1(1))).toDS(), Array(Tuple1(1)))
}

test("arbitrary sequences") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also test nested sequences, e.g. List(Queue(1)), and sequences inside product, e.g. List(1) -> Queue(1)

Copy link
Contributor Author

@michalsenkyr michalsenkyr Jan 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some sequence-product combination tests.
Nested sequences were never supported (tried on master and 2.0.2). That would probably be worthy of another ticket.

@cloud-fan
Copy link
Contributor

retest this please

import org.apache.spark.sql.test.SharedSQLContext

case class IntClass(value: Int)

case class SeqCC(s: Seq[Int])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does CC short for? How about SeqClass?

@cloud-fan
Copy link
Contributor

cloud-fan commented Jan 4, 2017

LGTM, please create 2 more tickets for the optimization you metioned in #16240 (comment) and the nested custom collection problem.

@SparkQA
Copy link

SparkQA commented Jan 4, 2017

Test build #70859 has finished for PR 16240 at commit efd0801.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class SeqCC(s: Seq[Int])
  • case class ListCC(l: List[Int])
  • case class QueueCC(q: Queue[Int])
  • case class ComplexCC(seq: SeqCC, list: ListCC, queue: QueueCC)

@cloud-fan
Copy link
Contributor

you need to fix mima:

[error]  * method newDoubleSeqEncoder()org.apache.spark.sql.Encoder in class org.apache.spark.sql.SQLImplicits does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.newDoubleSeqEncoder")
[error]  * method newFloatSeqEncoder()org.apache.spark.sql.Encoder in class org.apache.spark.sql.SQLImplicits does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.newFloatSeqEncoder")
[error]  * method newByteSeqEncoder()org.apache.spark.sql.Encoder in class org.apache.spark.sql.SQLImplicits does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.newByteSeqEncoder")
[error]  * method newLongSeqEncoder()org.apache.spark.sql.Encoder in class org.apache.spark.sql.SQLImplicits does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.newLongSeqEncoder")
[error]  * method newStringSeqEncoder()org.apache.spark.sql.Encoder in class org.apache.spark.sql.SQLImplicits does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.newStringSeqEncoder")
[error]  * method newIntSeqEncoder()org.apache.spark.sql.Encoder in class org.apache.spark.sql.SQLImplicits does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.newIntSeqEncoder")
[error]  * method newBooleanSeqEncoder()org.apache.spark.sql.Encoder in class org.apache.spark.sql.SQLImplicits does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.newBooleanSeqEncoder")
[error]  * method newShortSeqEncoder()org.apache.spark.sql.Encoder in class org.apache.spark.sql.SQLImplicits does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.newShortSeqEncoder")

@michalsenkyr
Copy link
Contributor Author

Not sure how to run MiMa tests locally so I tried my best to figure out what was necessary. Hope this fixes it.
The downside of the fix is that I had to restore the original methods in SQLImplicits. I removed the implicit keyword and added deprecation annotations as only the new methods should be used from now on. Old code importing the methods explicitly should be fine now.

@marmbrus
Copy link
Contributor

marmbrus commented Jan 5, 2017

For future reference: https://github.com/apache/spark/blob/master/dev/mima (script to run mima)

@marmbrus
Copy link
Contributor

marmbrus commented Jan 5, 2017

ok to test

@SparkQA
Copy link

SparkQA commented Jan 6, 2017

Test build #70943 has finished for PR 16240 at commit 68810c4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* @since 1.6.1
* @deprecated use [[newIntSequenceEncoder]]
*/
def newIntSeqEncoder: Encoder[Seq[Int]] = ExpressionEncoder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh you doesn't need to do this, just update the project/MimaExcludes.scala to add something like ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.newDoubleSeqEncoder"). Please see the history of the MimaExcludes.scala to see how others update this file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, I'm not sure I agree... Do we want to break binary compatibility for libraries that might be using this function? That could have even been resolved implicitly, so it would be confusing when it breaks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see, makes sense

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan
Copy link
Contributor

@michalsenkyr please create 2 more tickets for the optimization you metioned in #16240 (comment) and the nested custom collection problem.

@asfgit asfgit closed this in 903bb8e Jan 6, 2017
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Jan 9, 2017
…auses a CompileException (converting sequence to list)

## What changes were proposed in this pull request?

Added a `to` call at the end of the code generated by `ScalaReflection.deserializerFor` if the requested type is not a supertype of `WrappedArray[_]` that uses `CanBuildFrom[_, _, _]` to convert result into an arbitrary subtype of `Seq[_]`.

Care was taken to preserve the original deserialization where it is possible to avoid the overhead of conversion in cases where it is not needed

`ScalaReflection.serializerFor` could already be used to serialize any `Seq[_]` so it was not altered

`SQLImplicits` had to be altered and new implicit encoders added to permit serialization of other sequence types

Also fixes [SPARK-16815] Dataset[List[T]] leads to ArrayStoreException

## How was this patch tested?
```bash
./build/mvn -DskipTests clean package && ./dev/run-tests
```

Also manual execution of the following sets of commands in the Spark shell:
```scala
case class TestCC(key: Int, letters: List[String])

val ds1 = sc.makeRDD(Seq(
(List("D")),
(List("S","H")),
(List("F","H")),
(List("D","L","L"))
)).map(x=>(x.length,x)).toDF("key","letters").as[TestCC]

val test1=ds1.map{_.key}
test1.show
```

```scala
case class X(l: List[String])
spark.createDataset(Seq(List("A"))).map(X).show
```

```scala
spark.sqlContext.createDataset(sc.parallelize(List(1) :: Nil)).collect
```

After adding arbitrary sequence support also tested with the following commands:

```scala
case class QueueClass(q: scala.collection.immutable.Queue[Int])

spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect
```

Author: Michal Senkyr <mike.senkyr@gmail.com>

Closes apache#16240 from michalsenkyr/sql-caseclass-list-fix.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…auses a CompileException (converting sequence to list)

## What changes were proposed in this pull request?

Added a `to` call at the end of the code generated by `ScalaReflection.deserializerFor` if the requested type is not a supertype of `WrappedArray[_]` that uses `CanBuildFrom[_, _, _]` to convert result into an arbitrary subtype of `Seq[_]`.

Care was taken to preserve the original deserialization where it is possible to avoid the overhead of conversion in cases where it is not needed

`ScalaReflection.serializerFor` could already be used to serialize any `Seq[_]` so it was not altered

`SQLImplicits` had to be altered and new implicit encoders added to permit serialization of other sequence types

Also fixes [SPARK-16815] Dataset[List[T]] leads to ArrayStoreException

## How was this patch tested?
```bash
./build/mvn -DskipTests clean package && ./dev/run-tests
```

Also manual execution of the following sets of commands in the Spark shell:
```scala
case class TestCC(key: Int, letters: List[String])

val ds1 = sc.makeRDD(Seq(
(List("D")),
(List("S","H")),
(List("F","H")),
(List("D","L","L"))
)).map(x=>(x.length,x)).toDF("key","letters").as[TestCC]

val test1=ds1.map{_.key}
test1.show
```

```scala
case class X(l: List[String])
spark.createDataset(Seq(List("A"))).map(X).show
```

```scala
spark.sqlContext.createDataset(sc.parallelize(List(1) :: Nil)).collect
```

After adding arbitrary sequence support also tested with the following commands:

```scala
case class QueueClass(q: scala.collection.immutable.Queue[Int])

spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect
```

Author: Michal Senkyr <mike.senkyr@gmail.com>

Closes apache#16240 from michalsenkyr/sql-caseclass-list-fix.
ghost pushed a commit to dbtsai/spark that referenced this pull request Mar 28, 2017
## What changes were proposed in this pull request?

Optimization of arbitrary Scala sequence deserialization introduced by apache#16240.

The previous implementation constructed an array which was then converted by `to`. This required two passes in most cases.

This implementation attempts to remedy that by using `Builder`s provided by the `newBuilder` method on every Scala collection's companion object to build the resulting collection directly.

Example codegen for simple `List` (obtained using `Seq(List(1)).toDS().map(identity).queryExecution.debug.codegen`):

Before:

```
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private boolean deserializetoobject_resultIsNull;
/* 010 */   private java.lang.Object[] deserializetoobject_argValue;
/* 011 */   private boolean MapObjects_loopIsNull1;
/* 012 */   private int MapObjects_loopValue0;
/* 013 */   private boolean deserializetoobject_resultIsNull1;
/* 014 */   private scala.collection.generic.CanBuildFrom deserializetoobject_argValue1;
/* 015 */   private UnsafeRow deserializetoobject_result;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter;
/* 018 */   private scala.collection.immutable.List mapelements_argValue;
/* 019 */   private UnsafeRow mapelements_result;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter;
/* 022 */   private scala.collection.immutable.List serializefromobject_argValue;
/* 023 */   private UnsafeRow serializefromobject_result;
/* 024 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 025 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 026 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter;
/* 027 */
/* 028 */   public GeneratedIterator(Object[] references) {
/* 029 */     this.references = references;
/* 030 */   }
/* 031 */
/* 032 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 033 */     partitionIndex = index;
/* 034 */     this.inputs = inputs;
/* 035 */     inputadapter_input = inputs[0];
/* 036 */
/* 037 */     deserializetoobject_result = new UnsafeRow(1);
/* 038 */     this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32);
/* 039 */     this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1);
/* 040 */
/* 041 */     mapelements_result = new UnsafeRow(1);
/* 042 */     this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32);
/* 043 */     this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1);
/* 044 */
/* 045 */     serializefromobject_result = new UnsafeRow(1);
/* 046 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32);
/* 047 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 048 */     this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 049 */
/* 050 */   }
/* 051 */
/* 052 */   protected void processNext() throws java.io.IOException {
/* 053 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 054 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 055 */       ArrayData inputadapter_value = inputadapter_row.getArray(0);
/* 056 */
/* 057 */       deserializetoobject_resultIsNull = false;
/* 058 */
/* 059 */       if (!deserializetoobject_resultIsNull) {
/* 060 */         ArrayData deserializetoobject_value3 = null;
/* 061 */
/* 062 */         if (!false) {
/* 063 */           Integer[] deserializetoobject_convertedArray = null;
/* 064 */           int deserializetoobject_dataLength = inputadapter_value.numElements();
/* 065 */           deserializetoobject_convertedArray = new Integer[deserializetoobject_dataLength];
/* 066 */
/* 067 */           int deserializetoobject_loopIndex = 0;
/* 068 */           while (deserializetoobject_loopIndex < deserializetoobject_dataLength) {
/* 069 */             MapObjects_loopValue0 = (int) (inputadapter_value.getInt(deserializetoobject_loopIndex));
/* 070 */             MapObjects_loopIsNull1 = inputadapter_value.isNullAt(deserializetoobject_loopIndex);
/* 071 */
/* 072 */             if (MapObjects_loopIsNull1) {
/* 073 */               throw new RuntimeException(((java.lang.String) references[0]));
/* 074 */             }
/* 075 */             if (false) {
/* 076 */               deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null;
/* 077 */             } else {
/* 078 */               deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue0;
/* 079 */             }
/* 080 */
/* 081 */             deserializetoobject_loopIndex += 1;
/* 082 */           }
/* 083 */
/* 084 */           deserializetoobject_value3 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray);
/* 085 */         }
/* 086 */         boolean deserializetoobject_isNull2 = true;
/* 087 */         java.lang.Object[] deserializetoobject_value2 = null;
/* 088 */         if (!false) {
/* 089 */           deserializetoobject_isNull2 = false;
/* 090 */           if (!deserializetoobject_isNull2) {
/* 091 */             Object deserializetoobject_funcResult = null;
/* 092 */             deserializetoobject_funcResult = deserializetoobject_value3.array();
/* 093 */             if (deserializetoobject_funcResult == null) {
/* 094 */               deserializetoobject_isNull2 = true;
/* 095 */             } else {
/* 096 */               deserializetoobject_value2 = (java.lang.Object[]) deserializetoobject_funcResult;
/* 097 */             }
/* 098 */
/* 099 */           }
/* 100 */           deserializetoobject_isNull2 = deserializetoobject_value2 == null;
/* 101 */         }
/* 102 */         deserializetoobject_resultIsNull = deserializetoobject_isNull2;
/* 103 */         deserializetoobject_argValue = deserializetoobject_value2;
/* 104 */       }
/* 105 */
/* 106 */       boolean deserializetoobject_isNull1 = deserializetoobject_resultIsNull;
/* 107 */       final scala.collection.Seq deserializetoobject_value1 = deserializetoobject_resultIsNull ? null : scala.collection.mutable.WrappedArray.make(deserializetoobject_argValue);
/* 108 */       deserializetoobject_isNull1 = deserializetoobject_value1 == null;
/* 109 */       boolean deserializetoobject_isNull = true;
/* 110 */       scala.collection.immutable.List deserializetoobject_value = null;
/* 111 */       if (!deserializetoobject_isNull1) {
/* 112 */         deserializetoobject_resultIsNull1 = false;
/* 113 */
/* 114 */         if (!deserializetoobject_resultIsNull1) {
/* 115 */           boolean deserializetoobject_isNull6 = false;
/* 116 */           final scala.collection.generic.CanBuildFrom deserializetoobject_value6 = false ? null : scala.collection.immutable.List.canBuildFrom();
/* 117 */           deserializetoobject_isNull6 = deserializetoobject_value6 == null;
/* 118 */           deserializetoobject_resultIsNull1 = deserializetoobject_isNull6;
/* 119 */           deserializetoobject_argValue1 = deserializetoobject_value6;
/* 120 */         }
/* 121 */
/* 122 */         deserializetoobject_isNull = deserializetoobject_resultIsNull1;
/* 123 */         if (!deserializetoobject_isNull) {
/* 124 */           Object deserializetoobject_funcResult1 = null;
/* 125 */           deserializetoobject_funcResult1 = deserializetoobject_value1.to(deserializetoobject_argValue1);
/* 126 */           if (deserializetoobject_funcResult1 == null) {
/* 127 */             deserializetoobject_isNull = true;
/* 128 */           } else {
/* 129 */             deserializetoobject_value = (scala.collection.immutable.List) deserializetoobject_funcResult1;
/* 130 */           }
/* 131 */
/* 132 */         }
/* 133 */         deserializetoobject_isNull = deserializetoobject_value == null;
/* 134 */       }
/* 135 */
/* 136 */       boolean mapelements_isNull = true;
/* 137 */       scala.collection.immutable.List mapelements_value = null;
/* 138 */       if (!false) {
/* 139 */         mapelements_argValue = deserializetoobject_value;
/* 140 */
/* 141 */         mapelements_isNull = false;
/* 142 */         if (!mapelements_isNull) {
/* 143 */           Object mapelements_funcResult = null;
/* 144 */           mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue);
/* 145 */           if (mapelements_funcResult == null) {
/* 146 */             mapelements_isNull = true;
/* 147 */           } else {
/* 148 */             mapelements_value = (scala.collection.immutable.List) mapelements_funcResult;
/* 149 */           }
/* 150 */
/* 151 */         }
/* 152 */         mapelements_isNull = mapelements_value == null;
/* 153 */       }
/* 154 */
/* 155 */       if (mapelements_isNull) {
/* 156 */         throw new RuntimeException(((java.lang.String) references[2]));
/* 157 */       }
/* 158 */       serializefromobject_argValue = mapelements_value;
/* 159 */
/* 160 */       final ArrayData serializefromobject_value = false ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue);
/* 161 */       serializefromobject_holder.reset();
/* 162 */
/* 163 */       // Remember the current cursor so that we can calculate how many bytes are
/* 164 */       // written later.
/* 165 */       final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
/* 166 */
/* 167 */       if (serializefromobject_value instanceof UnsafeArrayData) {
/* 168 */         final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
/* 169 */         // grow the global buffer before writing data.
/* 170 */         serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 171 */         ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 172 */         serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
/* 173 */
/* 174 */       } else {
/* 175 */         final int serializefromobject_numElements = serializefromobject_value.numElements();
/* 176 */         serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4);
/* 177 */
/* 178 */         for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
/* 179 */           if (serializefromobject_value.isNullAt(serializefromobject_index)) {
/* 180 */             serializefromobject_arrayWriter.setNullInt(serializefromobject_index);
/* 181 */           } else {
/* 182 */             final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index);
/* 183 */             serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
/* 184 */           }
/* 185 */         }
/* 186 */       }
/* 187 */
/* 188 */       serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
/* 189 */       serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 190 */       append(serializefromobject_result);
/* 191 */       if (shouldStop()) return;
/* 192 */     }
/* 193 */   }
/* 194 */ }
```

After:

```
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private boolean CollectObjects_loopIsNull1;
/* 010 */   private int CollectObjects_loopValue0;
/* 011 */   private UnsafeRow deserializetoobject_result;
/* 012 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder;
/* 013 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter;
/* 014 */   private scala.collection.immutable.List mapelements_argValue;
/* 015 */   private UnsafeRow mapelements_result;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter;
/* 018 */   private scala.collection.immutable.List serializefromobject_argValue;
/* 019 */   private UnsafeRow serializefromobject_result;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter;
/* 023 */
/* 024 */   public GeneratedIterator(Object[] references) {
/* 025 */     this.references = references;
/* 026 */   }
/* 027 */
/* 028 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 029 */     partitionIndex = index;
/* 030 */     this.inputs = inputs;
/* 031 */     inputadapter_input = inputs[0];
/* 032 */
/* 033 */     deserializetoobject_result = new UnsafeRow(1);
/* 034 */     this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32);
/* 035 */     this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1);
/* 036 */
/* 037 */     mapelements_result = new UnsafeRow(1);
/* 038 */     this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32);
/* 039 */     this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1);
/* 040 */
/* 041 */     serializefromobject_result = new UnsafeRow(1);
/* 042 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32);
/* 043 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 044 */     this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 045 */
/* 046 */   }
/* 047 */
/* 048 */   protected void processNext() throws java.io.IOException {
/* 049 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 050 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 051 */       ArrayData inputadapter_value = inputadapter_row.getArray(0);
/* 052 */
/* 053 */       scala.collection.immutable.List deserializetoobject_value = null;
/* 054 */
/* 055 */       if (!false) {
/* 056 */         int deserializetoobject_dataLength = inputadapter_value.numElements();
/* 057 */         scala.collection.mutable.Builder CollectObjects_builderValue2 = scala.collection.immutable.List$.MODULE$.newBuilder();
/* 058 */         CollectObjects_builderValue2.sizeHint(deserializetoobject_dataLength);
/* 059 */
/* 060 */         int deserializetoobject_loopIndex = 0;
/* 061 */         while (deserializetoobject_loopIndex < deserializetoobject_dataLength) {
/* 062 */           CollectObjects_loopValue0 = (int) (inputadapter_value.getInt(deserializetoobject_loopIndex));
/* 063 */           CollectObjects_loopIsNull1 = inputadapter_value.isNullAt(deserializetoobject_loopIndex);
/* 064 */
/* 065 */           if (CollectObjects_loopIsNull1) {
/* 066 */             throw new RuntimeException(((java.lang.String) references[0]));
/* 067 */           }
/* 068 */           if (false) {
/* 069 */             CollectObjects_builderValue2.$plus$eq(null);
/* 070 */           } else {
/* 071 */             CollectObjects_builderValue2.$plus$eq(CollectObjects_loopValue0);
/* 072 */           }
/* 073 */
/* 074 */           deserializetoobject_loopIndex += 1;
/* 075 */         }
/* 076 */
/* 077 */         deserializetoobject_value = (scala.collection.immutable.List) CollectObjects_builderValue2.result();
/* 078 */       }
/* 079 */
/* 080 */       boolean mapelements_isNull = true;
/* 081 */       scala.collection.immutable.List mapelements_value = null;
/* 082 */       if (!false) {
/* 083 */         mapelements_argValue = deserializetoobject_value;
/* 084 */
/* 085 */         mapelements_isNull = false;
/* 086 */         if (!mapelements_isNull) {
/* 087 */           Object mapelements_funcResult = null;
/* 088 */           mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue);
/* 089 */           if (mapelements_funcResult == null) {
/* 090 */             mapelements_isNull = true;
/* 091 */           } else {
/* 092 */             mapelements_value = (scala.collection.immutable.List) mapelements_funcResult;
/* 093 */           }
/* 094 */
/* 095 */         }
/* 096 */         mapelements_isNull = mapelements_value == null;
/* 097 */       }
/* 098 */
/* 099 */       if (mapelements_isNull) {
/* 100 */         throw new RuntimeException(((java.lang.String) references[2]));
/* 101 */       }
/* 102 */       serializefromobject_argValue = mapelements_value;
/* 103 */
/* 104 */       final ArrayData serializefromobject_value = false ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue);
/* 105 */       serializefromobject_holder.reset();
/* 106 */
/* 107 */       // Remember the current cursor so that we can calculate how many bytes are
/* 108 */       // written later.
/* 109 */       final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
/* 110 */
/* 111 */       if (serializefromobject_value instanceof UnsafeArrayData) {
/* 112 */         final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
/* 113 */         // grow the global buffer before writing data.
/* 114 */         serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 115 */         ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 116 */         serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
/* 117 */
/* 118 */       } else {
/* 119 */         final int serializefromobject_numElements = serializefromobject_value.numElements();
/* 120 */         serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4);
/* 121 */
/* 122 */         for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
/* 123 */           if (serializefromobject_value.isNullAt(serializefromobject_index)) {
/* 124 */             serializefromobject_arrayWriter.setNullInt(serializefromobject_index);
/* 125 */           } else {
/* 126 */             final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index);
/* 127 */             serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
/* 128 */           }
/* 129 */         }
/* 130 */       }
/* 131 */
/* 132 */       serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
/* 133 */       serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 134 */       append(serializefromobject_result);
/* 135 */       if (shouldStop()) return;
/* 136 */     }
/* 137 */   }
/* 138 */ }
```

Benchmark results before:

```
OpenJDK 64-Bit Server VM 1.8.0_112-b15 on Linux 4.8.13-1-ARCH
AMD A10-4600M APU with Radeon(tm) HD Graphics
collect:                                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Seq                                            269 /  370          0.0      269125.8       1.0X
List                                           154 /  176          0.0      154453.5       1.7X
mutable.Queue                                  210 /  233          0.0      209691.6       1.3X
```

Benchmark results after:

```
OpenJDK 64-Bit Server VM 1.8.0_112-b15 on Linux 4.8.13-1-ARCH
AMD A10-4600M APU with Radeon(tm) HD Graphics
collect:                                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Seq                                            255 /  316          0.0      254697.3       1.0X
List                                           152 /  177          0.0      152410.0       1.7X
mutable.Queue                                  213 /  235          0.0      213470.0       1.2X
```

## How was this patch tested?

```bash
./build/mvn -DskipTests clean package && ./dev/run-tests
```

Additionally in Spark Shell:

```scala
case class QueueClass(q: scala.collection.immutable.Queue[Int])

spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect
```

Author: Michal Senkyr <mike.senkyr@gmail.com>

Closes apache#16541 from michalsenkyr/dataset-seq-builder.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants