From c47f1895711d5093e4e677ad7ac5f02fe9eb3b61 Mon Sep 17 00:00:00 2001 From: Michal Senkyr Date: Fri, 9 Dec 2016 23:36:49 +0100 Subject: [PATCH 01/12] Added call to toList if deserializing into List --- .../apache/spark/sql/catalyst/ScalaReflection.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 6e20096901d99..ba2c2766cf372 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -312,12 +312,21 @@ object ScalaReflection extends ScalaReflection { "array", ObjectType(classOf[Array[Any]])) - StaticInvoke( + val wrappedArray = StaticInvoke( scala.collection.mutable.WrappedArray.getClass, ObjectType(classOf[Seq[_]]), "make", array :: Nil) + if (t <:< localTypeOf[List[_]]) { + Invoke( + wrappedArray, + "toList", + ObjectType(classOf[List[_]])) + } else { + wrappedArray + } + case t if t <:< localTypeOf[Map[_, _]] => // TODO: add walked type path for map val TypeRef(_, _, Seq(keyType, valueType)) = t From 8c15b475fb053aef19906d6a465309d299ca7b4d Mon Sep 17 00:00:00 2001 From: Michal Senkyr Date: Sat, 10 Dec 2016 00:30:49 +0100 Subject: [PATCH 02/12] Added unit test --- .../apache/spark/sql/catalyst/ScalaReflectionSuite.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala index 43b6afd9ad896..6c2c38df0e477 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -291,6 +291,15 @@ class ScalaReflectionSuite extends SparkFunSuite { .cls.isAssignableFrom(classOf[org.apache.spark.sql.catalyst.util.GenericArrayData])) } + test("SPARK 16792: Get correct deserializer for List[_]") { + val listDeserializer = deserializerFor[List[Int]] + assert(listDeserializer.dataType == ObjectType(classOf[List[_]])) + + // Check whether Seq[_] does not use List[_] deserializer (would needlessly add toList overhead) + val seqDeserializer = deserializerFor[Seq[Int]] + assert(seqDeserializer.dataType != ObjectType(classOf[List[_]])) + } + private val dataTypeForComplexData = dataTypeFor[ComplexData] private val typeOfComplexData = typeOf[ComplexData] From b04f46e82707ed222d92f2368e551e403cc1a2c2 Mon Sep 17 00:00:00 2001 From: Michal Senkyr Date: Sat, 10 Dec 2016 21:34:14 +0100 Subject: [PATCH 03/12] Added support for arbitrary sequence types. WARNING: Breaks Seq.toDS for Seq[Product] --- .../spark/sql/catalyst/ScalaReflection.scala | 35 +++++++++-- .../sql/catalyst/ScalaReflectionSuite.scala | 26 +++++++- .../org/apache/spark/sql/SQLImplicits.scala | 59 ++++++++++++++++--- .../spark/sql/DatasetPrimitiveSuite.scala | 24 ++++++++ 4 files changed, 128 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index ba2c2766cf372..7210b36203367 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -318,13 +318,38 @@ object ScalaReflection extends ScalaReflection { "make", array :: Nil) - if (t <:< localTypeOf[List[_]]) { + if (localTypeOf[scala.collection.mutable.WrappedArray[_]] <:< t.erasure) { + wrappedArray + } else { + // Convert to another type using `to` + val cls = mirror.runtimeClass(t.typeSymbol.asClass) + import scala.collection.generic.CanBuildFrom + import scala.reflect.ClassTag + import scala.util.{Try, Success} Invoke( wrappedArray, - "toList", - ObjectType(classOf[List[_]])) - } else { - wrappedArray + "to", + ObjectType(cls), + StaticInvoke( + cls, + ObjectType(classOf[CanBuildFrom[_, _, _]]), + "canBuildFrom", + Try(cls.getDeclaredMethod("canBuildFrom", classOf[ClassTag[_]])) match { + case Success(_) => + StaticInvoke( + ClassTag.getClass, + ObjectType(classOf[ClassTag[_]]), + "apply", + StaticInvoke( + cls, + ObjectType(classOf[Class[_]]), + "getClass" + ) :: Nil + ) :: Nil + case _ => Nil + } + ) :: Nil + ) } case t if t <:< localTypeOf[Map[_, _]] => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala index 6c2c38df0e477..650a35398f3e8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -294,10 +294,32 @@ class ScalaReflectionSuite extends SparkFunSuite { test("SPARK 16792: Get correct deserializer for List[_]") { val listDeserializer = deserializerFor[List[Int]] assert(listDeserializer.dataType == ObjectType(classOf[List[_]])) + } - // Check whether Seq[_] does not use List[_] deserializer (would needlessly add toList overhead) + test("serialize and deserialize arbitrary sequence types") { + import scala.collection.immutable.Queue + val queueSerializer = serializerFor[Queue[Int]](BoundReference( + 0, ObjectType(classOf[Queue[Int]]), nullable = false)) + assert(queueSerializer.dataType.head.dataType == + ArrayType(IntegerType, containsNull = false)) + val queueDeserializer = deserializerFor[Queue[Int]] + assert(queueDeserializer.dataType == ObjectType(classOf[Queue[_]])) + + import scala.collection.mutable.ArrayBuffer + val arrayBufferSerializer = serializerFor[ArrayBuffer[Int]](BoundReference( + 0, ObjectType(classOf[ArrayBuffer[Int]]), nullable = false)) + assert(arrayBufferSerializer.dataType.head.dataType == + ArrayType(IntegerType, containsNull = false)) + val arrayBufferDeserializer = deserializerFor[ArrayBuffer[Int]] + assert(arrayBufferDeserializer.dataType == ObjectType(classOf[ArrayBuffer[_]])) + + // Check whether conversion is skipped when using WrappedArray[_] supertype + // (would otherwise needlessly add overhead) + import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke val seqDeserializer = deserializerFor[Seq[Int]] - assert(seqDeserializer.dataType != ObjectType(classOf[List[_]])) + assert(seqDeserializer.asInstanceOf[StaticInvoke].staticObject == + scala.collection.mutable.WrappedArray.getClass) + assert(seqDeserializer.asInstanceOf[StaticInvoke].functionName == "make") } private val dataTypeForComplexData = dataTypeFor[ComplexData] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index 73d16d8a10fd6..8b64d43dd67f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -100,31 +100,72 @@ abstract class SQLImplicits { // Seqs /** @since 1.6.1 */ - implicit def newIntSeqEncoder: Encoder[Seq[Int]] = ExpressionEncoder() + implicit def newIntSeqEncoder[T <: Seq[Int] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newLongSeqEncoder: Encoder[Seq[Long]] = ExpressionEncoder() + implicit def newLongSeqEncoder[T <: Seq[Long] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newDoubleSeqEncoder: Encoder[Seq[Double]] = ExpressionEncoder() + implicit def newDoubleSeqEncoder[T <: Seq[Double] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newFloatSeqEncoder: Encoder[Seq[Float]] = ExpressionEncoder() + implicit def newFloatSeqEncoder[T <: Seq[Float] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newByteSeqEncoder: Encoder[Seq[Byte]] = ExpressionEncoder() + implicit def newByteSeqEncoder[T <: Seq[Byte] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newShortSeqEncoder: Encoder[Seq[Short]] = ExpressionEncoder() + implicit def newShortSeqEncoder[T <: Seq[Short] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newBooleanSeqEncoder: Encoder[Seq[Boolean]] = ExpressionEncoder() + implicit def newBooleanSeqEncoder[T <: Seq[Boolean] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newStringSeqEncoder: Encoder[Seq[String]] = ExpressionEncoder() + implicit def newStringSeqEncoder[T <: Seq[String] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = ExpressionEncoder() + implicit def newProductSeqEncoder[A <: Product : TypeTag, T <: Seq[A] : TypeTag]: Encoder[T] = + ExpressionEncoder() + + // Seqs with product (List) disambiguation + + /** @since 2.2.0 */ + implicit def newIntSeqWithProductEncoder[T <: Seq[Int] with Product : TypeTag]: Encoder[T] = + newIntSeqEncoder + + /** @since 2.2.0 */ + implicit def newLongSeqWithProductEncoder[T <: Seq[Long] with Product : TypeTag]: Encoder[T] = + newLongSeqEncoder + + /** @since 2.2.0 */ + implicit def newDoubleListEncoder[T <: Seq[Double] with Product : TypeTag]: Encoder[T] = + newDoubleSeqEncoder + + /** @since 2.2.0 */ + implicit def newFloatSeqWithProductEncoder[T <: Seq[Float] with Product : TypeTag]: Encoder[T] = + newFloatSeqEncoder + + /** @since 2.2.0 */ + implicit def newByteSeqWithProductEncoder[T <: Seq[Byte] with Product : TypeTag]: Encoder[T] = + newByteSeqEncoder + + /** @since 2.2.0 */ + implicit def newShortSeqWithProductEncoder[T <: Seq[Short] with Product : TypeTag]: Encoder[T] = + newShortSeqEncoder + + /** @since 2.2.0 */ + implicit def newBooleanSeqWithProductEncoder[T <: Seq[Boolean] with Product : TypeTag] + : Encoder[T] = + newBooleanSeqEncoder + + /** @since 2.2.0 */ + implicit def newStringSeqWithProductEncoder[T <: Seq[String] with Product : TypeTag]: Encoder[T] = + newStringSeqEncoder + + /** @since 2.2.0 */ + implicit def newProductSeqWithProductEncoder + [A <: Product : TypeTag, T <: Seq[A] with Product : TypeTag]: Encoder[T] = + newProductSeqEncoder[A, T] // Arrays diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala index f8d4c61967f95..25d2b323552b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala @@ -130,6 +130,30 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { checkDataset(Seq(Array(Tuple1(1))).toDS(), Array(Tuple1(1))) } + test("arbitrary sequences") { + import scala.collection.immutable.Queue + checkDataset(Seq(Queue(1)).toDS(), Queue(1)) + checkDataset(Seq(Queue(1.toLong)).toDS(), Queue(1.toLong)) + checkDataset(Seq(Queue(1.toDouble)).toDS(), Queue(1.toDouble)) + checkDataset(Seq(Queue(1.toFloat)).toDS(), Queue(1.toFloat)) + checkDataset(Seq(Queue(1.toByte)).toDS(), Queue(1.toByte)) + checkDataset(Seq(Queue(1.toShort)).toDS(), Queue(1.toShort)) + checkDataset(Seq(Queue(true)).toDS(), Queue(true)) + checkDataset(Seq(Queue("test")).toDS(), Queue("test")) + checkDataset(Seq(Queue(Tuple1(1))).toDS(), Queue(Tuple1(1))) + + import scala.collection.mutable.ArrayBuffer + checkDataset(Seq(ArrayBuffer(1)).toDS(), ArrayBuffer(1)) + checkDataset(Seq(ArrayBuffer(1.toLong)).toDS(), ArrayBuffer(1.toLong)) + checkDataset(Seq(ArrayBuffer(1.toDouble)).toDS(), ArrayBuffer(1.toDouble)) + checkDataset(Seq(ArrayBuffer(1.toFloat)).toDS(), ArrayBuffer(1.toFloat)) + checkDataset(Seq(ArrayBuffer(1.toByte)).toDS(), ArrayBuffer(1.toByte)) + checkDataset(Seq(ArrayBuffer(1.toShort)).toDS(), ArrayBuffer(1.toShort)) + checkDataset(Seq(ArrayBuffer(true)).toDS(), ArrayBuffer(true)) + checkDataset(Seq(ArrayBuffer("test")).toDS(), ArrayBuffer("test")) + checkDataset(Seq(ArrayBuffer(Tuple1(1))).toDS(), ArrayBuffer(Tuple1(1))) + } + test("package objects") { import packageobject._ checkDataset(Seq(PackageClass(1)).toDS(), PackageClass(1)) From c4c01293c58e2d191a23fd65ad4a9482797efc6a Mon Sep 17 00:00:00 2001 From: Michal Senkyr Date: Sat, 10 Dec 2016 22:40:15 +0100 Subject: [PATCH 04/12] Workaround for Seq.toDS with case classes (supports only Seq) --- .../src/main/scala/org/apache/spark/sql/SQLImplicits.scala | 4 ++++ .../scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index 8b64d43dd67f9..879006a0bcef9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -167,6 +167,10 @@ abstract class SQLImplicits { [A <: Product : TypeTag, T <: Seq[A] with Product : TypeTag]: Encoder[T] = newProductSeqEncoder[A, T] + // Workaround for implicit resolution problem for Seq.toDS (only supports Seq) + implicit def newProductSeqOnlyEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = + newProductSeqEncoder[A, Seq[A]] + // Arrays /** @since 1.6.1 */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala index 25d2b323552b5..9c883e6a02d6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala @@ -140,6 +140,8 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { checkDataset(Seq(Queue(1.toShort)).toDS(), Queue(1.toShort)) checkDataset(Seq(Queue(true)).toDS(), Queue(true)) checkDataset(Seq(Queue("test")).toDS(), Queue("test")) + // Implicit resolution problem - encoder needs to be provided explicitly + implicit val queueEncoder = newProductSeqEncoder[Tuple1[Int], Queue[Tuple1[Int]]] checkDataset(Seq(Queue(Tuple1(1))).toDS(), Queue(Tuple1(1))) import scala.collection.mutable.ArrayBuffer @@ -151,6 +153,8 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { checkDataset(Seq(ArrayBuffer(1.toShort)).toDS(), ArrayBuffer(1.toShort)) checkDataset(Seq(ArrayBuffer(true)).toDS(), ArrayBuffer(true)) checkDataset(Seq(ArrayBuffer("test")).toDS(), ArrayBuffer("test")) + // Implicit resolution problem - encoder needs to be provided explicitly + implicit val arrayBufferEncoder = newProductSeqEncoder[Tuple1[Int], ArrayBuffer[Tuple1[Int]]] checkDataset(Seq(ArrayBuffer(Tuple1(1))).toDS(), ArrayBuffer(Tuple1(1))) } From 96f9d9d7a5e171ed134e6f4cd62f75108da43ca4 Mon Sep 17 00:00:00 2001 From: Michal Senkyr Date: Thu, 15 Dec 2016 17:30:24 +0100 Subject: [PATCH 05/12] Fixed incorrect method name --- sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index 879006a0bcef9..0c3d1266cdc3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -138,7 +138,7 @@ abstract class SQLImplicits { newLongSeqEncoder /** @since 2.2.0 */ - implicit def newDoubleListEncoder[T <: Seq[Double] with Product : TypeTag]: Encoder[T] = + implicit def newDoubleSeqWithProductEncoder[T <: Seq[Double] with Product : TypeTag]: Encoder[T] = newDoubleSeqEncoder /** @since 2.2.0 */ From b530bf5472de3e38a8c872fbf36cd165d0cce0ba Mon Sep 17 00:00:00 2001 From: Michal Senkyr Date: Thu, 22 Dec 2016 23:42:13 +0100 Subject: [PATCH 06/12] Removed Seq with Product encoders in favor of LowPrioritySQLImplicits trait --- .../org/apache/spark/sql/SQLImplicits.scala | 58 +++++-------------- 1 file changed, 14 insertions(+), 44 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index 0c3d1266cdc3d..008c5824f4ae0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder * @since 1.6.0 */ @InterfaceStability.Evolving -abstract class SQLImplicits { +abstract class SQLImplicits extends LowPrioritySQLImplicits { protected def _sqlContext: SQLContext @@ -45,9 +45,6 @@ abstract class SQLImplicits { } } - /** @since 1.6.0 */ - implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T] - // Primitives /** @since 1.6.0 */ @@ -127,46 +124,6 @@ abstract class SQLImplicits { implicit def newProductSeqEncoder[A <: Product : TypeTag, T <: Seq[A] : TypeTag]: Encoder[T] = ExpressionEncoder() - // Seqs with product (List) disambiguation - - /** @since 2.2.0 */ - implicit def newIntSeqWithProductEncoder[T <: Seq[Int] with Product : TypeTag]: Encoder[T] = - newIntSeqEncoder - - /** @since 2.2.0 */ - implicit def newLongSeqWithProductEncoder[T <: Seq[Long] with Product : TypeTag]: Encoder[T] = - newLongSeqEncoder - - /** @since 2.2.0 */ - implicit def newDoubleSeqWithProductEncoder[T <: Seq[Double] with Product : TypeTag]: Encoder[T] = - newDoubleSeqEncoder - - /** @since 2.2.0 */ - implicit def newFloatSeqWithProductEncoder[T <: Seq[Float] with Product : TypeTag]: Encoder[T] = - newFloatSeqEncoder - - /** @since 2.2.0 */ - implicit def newByteSeqWithProductEncoder[T <: Seq[Byte] with Product : TypeTag]: Encoder[T] = - newByteSeqEncoder - - /** @since 2.2.0 */ - implicit def newShortSeqWithProductEncoder[T <: Seq[Short] with Product : TypeTag]: Encoder[T] = - newShortSeqEncoder - - /** @since 2.2.0 */ - implicit def newBooleanSeqWithProductEncoder[T <: Seq[Boolean] with Product : TypeTag] - : Encoder[T] = - newBooleanSeqEncoder - - /** @since 2.2.0 */ - implicit def newStringSeqWithProductEncoder[T <: Seq[String] with Product : TypeTag]: Encoder[T] = - newStringSeqEncoder - - /** @since 2.2.0 */ - implicit def newProductSeqWithProductEncoder - [A <: Product : TypeTag, T <: Seq[A] with Product : TypeTag]: Encoder[T] = - newProductSeqEncoder[A, T] - // Workaround for implicit resolution problem for Seq.toDS (only supports Seq) implicit def newProductSeqOnlyEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = newProductSeqEncoder[A, Seq[A]] @@ -225,3 +182,16 @@ abstract class SQLImplicits { implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name) } + +/** + * Lower priority implicit methods for converting Scala objects into [[Dataset]]s. + * Conflicting implicits are placed here to disambiguate resolution. + * + * Reasons for including specific implicits: + * newProductEncoder - to disambiguate for [[List]]s which are both [[Seq]] and [[Product]] + */ +trait LowPrioritySQLImplicits { + /** @since 1.6.0 */ + implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T] + +} From f75a8f1207d99daf249075eeec557c5a27c7a78a Mon Sep 17 00:00:00 2001 From: Michal Senkyr Date: Fri, 23 Dec 2016 23:08:10 +0100 Subject: [PATCH 07/12] Removal of extraneous TypeTag --- sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index 008c5824f4ae0..dfa21288cd064 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -121,7 +121,7 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits { implicit def newStringSeqEncoder[T <: Seq[String] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newProductSeqEncoder[A <: Product : TypeTag, T <: Seq[A] : TypeTag]: Encoder[T] = + implicit def newProductSeqEncoder[A <: Product, T <: Seq[A] : TypeTag]: Encoder[T] = ExpressionEncoder() // Workaround for implicit resolution problem for Seq.toDS (only supports Seq) From 21d9e9765ba1550fc0b11adbcd8a04457186bbd1 Mon Sep 17 00:00:00 2001 From: Michal Senkyr Date: Fri, 23 Dec 2016 23:49:54 +0100 Subject: [PATCH 08/12] Removed use of Try and refactored code for readability --- .../spark/sql/catalyst/ScalaReflection.scala | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 7210b36203367..5c73209235836 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -325,7 +325,24 @@ object ScalaReflection extends ScalaReflection { val cls = mirror.runtimeClass(t.typeSymbol.asClass) import scala.collection.generic.CanBuildFrom import scala.reflect.ClassTag - import scala.util.{Try, Success} + + // Some canBuildFrom methods take an implicit ClassTag parameter + val cbfParams = try { + cls.getDeclaredMethod("canBuildFrom", classOf[ClassTag[_]]) + StaticInvoke( + ClassTag.getClass, + ObjectType(classOf[ClassTag[_]]), + "apply", + StaticInvoke( + cls, + ObjectType(classOf[Class[_]]), + "getClass" + ) :: Nil + ) :: Nil + } catch { + case _: NoSuchMethodException => Nil + } + Invoke( wrappedArray, "to", @@ -334,20 +351,7 @@ object ScalaReflection extends ScalaReflection { cls, ObjectType(classOf[CanBuildFrom[_, _, _]]), "canBuildFrom", - Try(cls.getDeclaredMethod("canBuildFrom", classOf[ClassTag[_]])) match { - case Success(_) => - StaticInvoke( - ClassTag.getClass, - ObjectType(classOf[ClassTag[_]]), - "apply", - StaticInvoke( - cls, - ObjectType(classOf[Class[_]]), - "getClass" - ) :: Nil - ) :: Nil - case _ => Nil - } + cbfParams ) :: Nil ) } From c45bee4ac46bb36dcb2f58dc7dcbff938ee0d5e8 Mon Sep 17 00:00:00 2001 From: Michal Senkyr Date: Sat, 24 Dec 2016 11:58:35 +0100 Subject: [PATCH 09/12] Removed explicit type bound in newProductSeqEncoder and unneeded resulting workarounds --- .../src/main/scala/org/apache/spark/sql/SQLImplicits.scala | 6 +----- .../scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala | 4 ---- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index dfa21288cd064..0809295112bf5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -121,13 +121,9 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits { implicit def newStringSeqEncoder[T <: Seq[String] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newProductSeqEncoder[A <: Product, T <: Seq[A] : TypeTag]: Encoder[T] = + implicit def newProductSeqEncoder[T <: Seq[Product] : TypeTag]: Encoder[T] = ExpressionEncoder() - // Workaround for implicit resolution problem for Seq.toDS (only supports Seq) - implicit def newProductSeqOnlyEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = - newProductSeqEncoder[A, Seq[A]] - // Arrays /** @since 1.6.1 */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala index 9c883e6a02d6a..25d2b323552b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala @@ -140,8 +140,6 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { checkDataset(Seq(Queue(1.toShort)).toDS(), Queue(1.toShort)) checkDataset(Seq(Queue(true)).toDS(), Queue(true)) checkDataset(Seq(Queue("test")).toDS(), Queue("test")) - // Implicit resolution problem - encoder needs to be provided explicitly - implicit val queueEncoder = newProductSeqEncoder[Tuple1[Int], Queue[Tuple1[Int]]] checkDataset(Seq(Queue(Tuple1(1))).toDS(), Queue(Tuple1(1))) import scala.collection.mutable.ArrayBuffer @@ -153,8 +151,6 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { checkDataset(Seq(ArrayBuffer(1.toShort)).toDS(), ArrayBuffer(1.toShort)) checkDataset(Seq(ArrayBuffer(true)).toDS(), ArrayBuffer(true)) checkDataset(Seq(ArrayBuffer("test")).toDS(), ArrayBuffer("test")) - // Implicit resolution problem - encoder needs to be provided explicitly - implicit val arrayBufferEncoder = newProductSeqEncoder[Tuple1[Int], ArrayBuffer[Tuple1[Int]]] checkDataset(Seq(ArrayBuffer(Tuple1(1))).toDS(), ArrayBuffer(Tuple1(1))) } From efd0801e24088b90c1157de0cb0bfe8159aeaac5 Mon Sep 17 00:00:00 2001 From: Michal Senkyr Date: Fri, 30 Dec 2016 15:33:04 +0100 Subject: [PATCH 10/12] Sequence-product combination tests --- .../spark/sql/DatasetPrimitiveSuite.scala | 47 ++++++++++++++++++- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala index 25d2b323552b5..5fdd94c2f55ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala @@ -17,10 +17,21 @@ package org.apache.spark.sql +import scala.collection.immutable.Queue +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.sql.test.SharedSQLContext case class IntClass(value: Int) +case class SeqCC(s: Seq[Int]) + +case class ListCC(l: List[Int]) + +case class QueueCC(q: Queue[Int]) + +case class ComplexCC(seq: SeqCC, list: ListCC, queue: QueueCC) + package object packageobject { case class PackageClass(value: Int) } @@ -131,7 +142,6 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { } test("arbitrary sequences") { - import scala.collection.immutable.Queue checkDataset(Seq(Queue(1)).toDS(), Queue(1)) checkDataset(Seq(Queue(1.toLong)).toDS(), Queue(1.toLong)) checkDataset(Seq(Queue(1.toDouble)).toDS(), Queue(1.toDouble)) @@ -142,7 +152,6 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { checkDataset(Seq(Queue("test")).toDS(), Queue("test")) checkDataset(Seq(Queue(Tuple1(1))).toDS(), Queue(Tuple1(1))) - import scala.collection.mutable.ArrayBuffer checkDataset(Seq(ArrayBuffer(1)).toDS(), ArrayBuffer(1)) checkDataset(Seq(ArrayBuffer(1.toLong)).toDS(), ArrayBuffer(1.toLong)) checkDataset(Seq(ArrayBuffer(1.toDouble)).toDS(), ArrayBuffer(1.toDouble)) @@ -154,6 +163,40 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { checkDataset(Seq(ArrayBuffer(Tuple1(1))).toDS(), ArrayBuffer(Tuple1(1))) } + test("sequence and product combinations") { + // Case classes + checkDataset(Seq(SeqCC(Seq(1))).toDS(), SeqCC(Seq(1))) + checkDataset(Seq(Seq(SeqCC(Seq(1)))).toDS(), Seq(SeqCC(Seq(1)))) + checkDataset(Seq(List(SeqCC(Seq(1)))).toDS(), List(SeqCC(Seq(1)))) + checkDataset(Seq(Queue(SeqCC(Seq(1)))).toDS(), Queue(SeqCC(Seq(1)))) + + checkDataset(Seq(ListCC(List(1))).toDS(), ListCC(List(1))) + checkDataset(Seq(Seq(ListCC(List(1)))).toDS(), Seq(ListCC(List(1)))) + checkDataset(Seq(List(ListCC(List(1)))).toDS(), List(ListCC(List(1)))) + checkDataset(Seq(Queue(ListCC(List(1)))).toDS(), Queue(ListCC(List(1)))) + + checkDataset(Seq(QueueCC(Queue(1))).toDS(), QueueCC(Queue(1))) + checkDataset(Seq(Seq(QueueCC(Queue(1)))).toDS(), Seq(QueueCC(Queue(1)))) + checkDataset(Seq(List(QueueCC(Queue(1)))).toDS(), List(QueueCC(Queue(1)))) + checkDataset(Seq(Queue(QueueCC(Queue(1)))).toDS(), Queue(QueueCC(Queue(1)))) + + val complexCC = ComplexCC(SeqCC(Seq(1)), ListCC(List(2)), QueueCC(Queue(3))) + checkDataset(Seq(complexCC).toDS(), complexCC) + checkDataset(Seq(Seq(complexCC)).toDS(), Seq(complexCC)) + checkDataset(Seq(List(complexCC)).toDS(), List(complexCC)) + checkDataset(Seq(Queue(complexCC)).toDS(), Queue(complexCC)) + + // Tuples + checkDataset(Seq(Seq(1) -> Seq(2)).toDS(), Seq(1) -> Seq(2)) + checkDataset(Seq(List(1) -> Queue(2)).toDS(), List(1) -> Queue(2)) + checkDataset(Seq(List(Seq("test1") -> List(Queue("test2")))).toDS(), + List(Seq("test1") -> List(Queue("test2")))) + + // Complex + checkDataset(Seq(ListCC(List(1)) -> Queue("test" -> SeqCC(Seq(2)))).toDS(), + ListCC(List(1)) -> Queue("test" -> SeqCC(Seq(2)))) + } + test("package objects") { import packageobject._ checkDataset(Seq(PackageClass(1)).toDS(), PackageClass(1)) From 2ad7eb0c89ce0d359300780d445bb37000764a29 Mon Sep 17 00:00:00 2001 From: Michal Senkyr Date: Thu, 5 Jan 2017 18:57:17 +0100 Subject: [PATCH 11/12] Test case classes renamed --- .../spark/sql/DatasetPrimitiveSuite.scala | 52 +++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala index 5fdd94c2f55ce..6b50cb3e48c76 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala @@ -24,13 +24,13 @@ import org.apache.spark.sql.test.SharedSQLContext case class IntClass(value: Int) -case class SeqCC(s: Seq[Int]) +case class SeqClass(s: Seq[Int]) -case class ListCC(l: List[Int]) +case class ListClass(l: List[Int]) -case class QueueCC(q: Queue[Int]) +case class QueueClass(q: Queue[Int]) -case class ComplexCC(seq: SeqCC, list: ListCC, queue: QueueCC) +case class ComplexClass(seq: SeqClass, list: ListClass, queue: QueueClass) package object packageobject { case class PackageClass(value: Int) @@ -165,26 +165,26 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { test("sequence and product combinations") { // Case classes - checkDataset(Seq(SeqCC(Seq(1))).toDS(), SeqCC(Seq(1))) - checkDataset(Seq(Seq(SeqCC(Seq(1)))).toDS(), Seq(SeqCC(Seq(1)))) - checkDataset(Seq(List(SeqCC(Seq(1)))).toDS(), List(SeqCC(Seq(1)))) - checkDataset(Seq(Queue(SeqCC(Seq(1)))).toDS(), Queue(SeqCC(Seq(1)))) - - checkDataset(Seq(ListCC(List(1))).toDS(), ListCC(List(1))) - checkDataset(Seq(Seq(ListCC(List(1)))).toDS(), Seq(ListCC(List(1)))) - checkDataset(Seq(List(ListCC(List(1)))).toDS(), List(ListCC(List(1)))) - checkDataset(Seq(Queue(ListCC(List(1)))).toDS(), Queue(ListCC(List(1)))) - - checkDataset(Seq(QueueCC(Queue(1))).toDS(), QueueCC(Queue(1))) - checkDataset(Seq(Seq(QueueCC(Queue(1)))).toDS(), Seq(QueueCC(Queue(1)))) - checkDataset(Seq(List(QueueCC(Queue(1)))).toDS(), List(QueueCC(Queue(1)))) - checkDataset(Seq(Queue(QueueCC(Queue(1)))).toDS(), Queue(QueueCC(Queue(1)))) - - val complexCC = ComplexCC(SeqCC(Seq(1)), ListCC(List(2)), QueueCC(Queue(3))) - checkDataset(Seq(complexCC).toDS(), complexCC) - checkDataset(Seq(Seq(complexCC)).toDS(), Seq(complexCC)) - checkDataset(Seq(List(complexCC)).toDS(), List(complexCC)) - checkDataset(Seq(Queue(complexCC)).toDS(), Queue(complexCC)) + checkDataset(Seq(SeqClass(Seq(1))).toDS(), SeqClass(Seq(1))) + checkDataset(Seq(Seq(SeqClass(Seq(1)))).toDS(), Seq(SeqClass(Seq(1)))) + checkDataset(Seq(List(SeqClass(Seq(1)))).toDS(), List(SeqClass(Seq(1)))) + checkDataset(Seq(Queue(SeqClass(Seq(1)))).toDS(), Queue(SeqClass(Seq(1)))) + + checkDataset(Seq(ListClass(List(1))).toDS(), ListClass(List(1))) + checkDataset(Seq(Seq(ListClass(List(1)))).toDS(), Seq(ListClass(List(1)))) + checkDataset(Seq(List(ListClass(List(1)))).toDS(), List(ListClass(List(1)))) + checkDataset(Seq(Queue(ListClass(List(1)))).toDS(), Queue(ListClass(List(1)))) + + checkDataset(Seq(QueueClass(Queue(1))).toDS(), QueueClass(Queue(1))) + checkDataset(Seq(Seq(QueueClass(Queue(1)))).toDS(), Seq(QueueClass(Queue(1)))) + checkDataset(Seq(List(QueueClass(Queue(1)))).toDS(), List(QueueClass(Queue(1)))) + checkDataset(Seq(Queue(QueueClass(Queue(1)))).toDS(), Queue(QueueClass(Queue(1)))) + + val complex = ComplexClass(SeqClass(Seq(1)), ListClass(List(2)), QueueClass(Queue(3))) + checkDataset(Seq(complex).toDS(), complex) + checkDataset(Seq(Seq(complex)).toDS(), Seq(complex)) + checkDataset(Seq(List(complex)).toDS(), List(complex)) + checkDataset(Seq(Queue(complex)).toDS(), Queue(complex)) // Tuples checkDataset(Seq(Seq(1) -> Seq(2)).toDS(), Seq(1) -> Seq(2)) @@ -193,8 +193,8 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { List(Seq("test1") -> List(Queue("test2")))) // Complex - checkDataset(Seq(ListCC(List(1)) -> Queue("test" -> SeqCC(Seq(2)))).toDS(), - ListCC(List(1)) -> Queue("test" -> SeqCC(Seq(2)))) + checkDataset(Seq(ListClass(List(1)) -> Queue("test" -> SeqClass(Seq(2)))).toDS(), + ListClass(List(1)) -> Queue("test" -> SeqClass(Seq(2)))) } test("package objects") { From 68810c4efb445d237604b661511b161a1c42a9bd Mon Sep 17 00:00:00 2001 From: Michal Senkyr Date: Thu, 5 Jan 2017 18:58:20 +0100 Subject: [PATCH 12/12] Fix MiMa tests --- .../org/apache/spark/sql/SQLImplicits.scala | 98 +++++++++++++++---- 1 file changed, 80 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index 0809295112bf5..2826fdf11b928 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -96,32 +96,94 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits { // Seqs - /** @since 1.6.1 */ - implicit def newIntSeqEncoder[T <: Seq[Int] : TypeTag]: Encoder[T] = ExpressionEncoder() + /** + * @since 1.6.1 + * @deprecated use [[newIntSequenceEncoder]] + */ + def newIntSeqEncoder: Encoder[Seq[Int]] = ExpressionEncoder() - /** @since 1.6.1 */ - implicit def newLongSeqEncoder[T <: Seq[Long] : TypeTag]: Encoder[T] = ExpressionEncoder() + /** + * @since 1.6.1 + * @deprecated use [[newLongSequenceEncoder]] + */ + def newLongSeqEncoder: Encoder[Seq[Long]] = ExpressionEncoder() - /** @since 1.6.1 */ - implicit def newDoubleSeqEncoder[T <: Seq[Double] : TypeTag]: Encoder[T] = ExpressionEncoder() + /** + * @since 1.6.1 + * @deprecated use [[newDoubleSequenceEncoder]] + */ + def newDoubleSeqEncoder: Encoder[Seq[Double]] = ExpressionEncoder() - /** @since 1.6.1 */ - implicit def newFloatSeqEncoder[T <: Seq[Float] : TypeTag]: Encoder[T] = ExpressionEncoder() + /** + * @since 1.6.1 + * @deprecated use [[newFloatSequenceEncoder]] + */ + def newFloatSeqEncoder: Encoder[Seq[Float]] = ExpressionEncoder() - /** @since 1.6.1 */ - implicit def newByteSeqEncoder[T <: Seq[Byte] : TypeTag]: Encoder[T] = ExpressionEncoder() + /** + * @since 1.6.1 + * @deprecated use [[newByteSequenceEncoder]] + */ + def newByteSeqEncoder: Encoder[Seq[Byte]] = ExpressionEncoder() - /** @since 1.6.1 */ - implicit def newShortSeqEncoder[T <: Seq[Short] : TypeTag]: Encoder[T] = ExpressionEncoder() + /** + * @since 1.6.1 + * @deprecated use [[newShortSequenceEncoder]] + */ + def newShortSeqEncoder: Encoder[Seq[Short]] = ExpressionEncoder() - /** @since 1.6.1 */ - implicit def newBooleanSeqEncoder[T <: Seq[Boolean] : TypeTag]: Encoder[T] = ExpressionEncoder() + /** + * @since 1.6.1 + * @deprecated use [[newBooleanSequenceEncoder]] + */ + def newBooleanSeqEncoder: Encoder[Seq[Boolean]] = ExpressionEncoder() - /** @since 1.6.1 */ - implicit def newStringSeqEncoder[T <: Seq[String] : TypeTag]: Encoder[T] = ExpressionEncoder() + /** + * @since 1.6.1 + * @deprecated use [[newStringSequenceEncoder]] + */ + def newStringSeqEncoder: Encoder[Seq[String]] = ExpressionEncoder() - /** @since 1.6.1 */ - implicit def newProductSeqEncoder[T <: Seq[Product] : TypeTag]: Encoder[T] = + /** + * @since 1.6.1 + * @deprecated use [[newProductSequenceEncoder]] + */ + implicit def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = ExpressionEncoder() + + /** @since 2.2.0 */ + implicit def newIntSequenceEncoder[T <: Seq[Int] : TypeTag]: Encoder[T] = + ExpressionEncoder() + + /** @since 2.2.0 */ + implicit def newLongSequenceEncoder[T <: Seq[Long] : TypeTag]: Encoder[T] = + ExpressionEncoder() + + /** @since 2.2.0 */ + implicit def newDoubleSequenceEncoder[T <: Seq[Double] : TypeTag]: Encoder[T] = + ExpressionEncoder() + + /** @since 2.2.0 */ + implicit def newFloatSequenceEncoder[T <: Seq[Float] : TypeTag]: Encoder[T] = + ExpressionEncoder() + + /** @since 2.2.0 */ + implicit def newByteSequenceEncoder[T <: Seq[Byte] : TypeTag]: Encoder[T] = + ExpressionEncoder() + + /** @since 2.2.0 */ + implicit def newShortSequenceEncoder[T <: Seq[Short] : TypeTag]: Encoder[T] = + ExpressionEncoder() + + /** @since 2.2.0 */ + implicit def newBooleanSequenceEncoder[T <: Seq[Boolean] : TypeTag]: Encoder[T] = + ExpressionEncoder() + + /** @since 2.2.0 */ + implicit def newStringSequenceEncoder[T <: Seq[String] : TypeTag]: Encoder[T] = + ExpressionEncoder() + + /** @since 2.2.0 */ + implicit def newProductSequenceEncoder[T <: Seq[Product] : TypeTag]: Encoder[T] = ExpressionEncoder() // Arrays