diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 4f18ff3268443..e9cf49a7ca4bf 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -79,6 +79,8 @@ license: | - In Spark 3.2, `TRANSFORM` operator can't support alias in inputs. In Spark 3.1 and earlier, we can write script transform like `SELECT TRANSFORM(a AS c1, b AS c2) USING 'cat' FROM TBL`. + - In Spark 3.2, `TRANSFORM` operator can support `ArrayType/MapType/StructType` without Hive SerDe, in this mode, we use `StructsToJosn` to convert `ArrayType/MapType/StructType` column to `STRING` and use `JsonToStructs` to parse `STRING` to `ArrayType/MapType/StructType`. In Spark 3.1, Spark just support case `ArrayType/MapType/StructType` column as `STRING` but can't support parse `STRING` to `ArrayType/MapType/StructType` output columns. + ## Upgrading from Spark SQL 3.0 to 3.1 - In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggregate` to `true`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala index 669b90f4d06dd..4d594f9f18483 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -30,7 +30,7 @@ import org.apache.spark.{SparkException, SparkFiles, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Cast, Expression, GenericInternalRow, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Cast, Expression, GenericInternalRow, JsonToStructs, Literal, StructsToJson, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils} @@ -47,7 +47,14 @@ trait BaseScriptTransformationExec extends UnaryExecNode { def ioschema: ScriptTransformationIOSchema protected lazy val inputExpressionsWithoutSerde: Seq[Expression] = { - input.map(Cast(_, StringType).withTimeZone(conf.sessionLocalTimeZone)) + input.map { in => + in.dataType match { + case _: ArrayType | _: MapType | _: StructType => + new StructsToJson(ioschema.inputSerdeProps.toMap, in) + .withTimeZone(conf.sessionLocalTimeZone) + case _ => Cast(in, StringType).withTimeZone(conf.sessionLocalTimeZone) + } + } } override def producedAttributes: AttributeSet = outputSet -- inputSet @@ -220,6 +227,11 @@ trait BaseScriptTransformationExec extends UnaryExecNode { case CalendarIntervalType => wrapperConvertException( data => IntervalUtils.stringToInterval(UTF8String.fromString(data)), converter) + case _: ArrayType | _: MapType | _: StructType => + val complexTypeFactory = JsonToStructs(attr.dataType, + ioschema.outputSerdeProps.toMap, Literal(null), Some(conf.sessionLocalTimeZone)) + wrapperConvertException(data => + complexTypeFactory.nullSafeEval(UTF8String.fromString(data)), any => any) case udt: UserDefinedType[_] => wrapperConvertException(data => udt.deserialize(data), converter) case dt => diff --git a/sql/core/src/test/resources/sql-tests/inputs/transform.sql b/sql/core/src/test/resources/sql-tests/inputs/transform.sql index e3ae34ee89300..7419ca1bd0a80 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/transform.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/transform.sql @@ -264,7 +264,7 @@ WHERE a <= 4 WINDOW w AS (PARTITION BY b ORDER BY a); SELECT TRANSFORM(b, MAX(a), CAST(SUM(c) AS STRING), myCol, myCol2) - USING 'cat' AS (a, b, c, d, e) + USING 'cat' AS (a STRING, b STRING, c STRING, d ARRAY, e STRING) FROM script_trans LATERAL VIEW explode(array(array(1,2,3))) myTable AS myCol LATERAL VIEW explode(myTable.myCol) myTable2 AS myCol2 diff --git a/sql/core/src/test/resources/sql-tests/results/transform.sql.out b/sql/core/src/test/resources/sql-tests/results/transform.sql.out index c20ec4ca20715..1d7e9cdb430e0 100644 --- a/sql/core/src/test/resources/sql-tests/results/transform.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/transform.sql.out @@ -495,7 +495,7 @@ struct -- !query SELECT TRANSFORM(b, MAX(a), CAST(SUM(c) AS STRING), myCol, myCol2) - USING 'cat' AS (a, b, c, d, e) + USING 'cat' AS (a STRING, b STRING, c STRING, d ARRAY, e STRING) FROM script_trans LATERAL VIEW explode(array(array(1,2,3))) myTable AS myCol LATERAL VIEW explode(myTable.myCol) myTable2 AS myCol2 @@ -503,11 +503,11 @@ WHERE a <= 4 GROUP BY b, myCol, myCol2 HAVING max(a) > 1 -- !query schema -struct +struct,e:string> -- !query output -5 4 6 [1, 2, 3] 1 -5 4 6 [1, 2, 3] 2 -5 4 6 [1, 2, 3] 3 +5 4 6 [1,2,3] 1 +5 4 6 [1,2,3] 2 +5 4 6 [1,2,3] 3 -- !query diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala index 2011d057338c1..67be6716fb8df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala @@ -302,14 +302,16 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU script = "cat", output = Seq( AttributeReference("a", CalendarIntervalType)(), - AttributeReference("b", StringType)(), - AttributeReference("c", StringType)(), - AttributeReference("d", StringType)(), + AttributeReference("b", ArrayType(IntegerType))(), + AttributeReference("c", MapType(StringType, IntegerType))(), + AttributeReference("d", StructType( + Array(StructField("_1", IntegerType), + StructField("_2", IntegerType))))(), AttributeReference("e", new SimpleTupleUDT)()), child = child, ioschema = defaultIOSchema ), - df.select('a, 'b.cast("string"), 'c.cast("string"), 'd.cast("string"), 'e).collect()) + df.select('a, 'b, 'c, 'd, 'e).collect()) } } @@ -471,6 +473,60 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU } } + test("SPARK-31936: Script transform support ArrayType/MapType/StructType (no serde)") { + assume(TestUtils.testCommandAvailable("python")) + withTempView("v") { + val df = Seq( + (Array(0, 1, 2), Array(Array(0, 1), Array(2)), + Map("a" -> 1), Map("b" -> Array("a", "b"))), + (Array(3, 4, 5), Array(Array(3, 4), Array(5)), + Map("b" -> 2), Map("c" -> Array("c", "d"))), + (Array(6, 7, 8), Array(Array(6, 7), Array(8)), + Map("c" -> 3), Map("d" -> Array("e", "f"))) + ).toDF("a", "b", "c", "d") + .select('a, 'b, 'c, 'd, + struct('a, 'b).as("e"), + struct('a, 'd).as("f"), + struct(struct('a, 'b), struct('a, 'd)).as("g") + ) + + checkAnswer( + df, + (child: SparkPlan) => createScriptTransformationExec( + input = Seq( + df.col("a").expr, + df.col("b").expr, + df.col("c").expr, + df.col("d").expr, + df.col("e").expr, + df.col("f").expr, + df.col("g").expr), + script = "cat", + output = Seq( + AttributeReference("a", ArrayType(IntegerType))(), + AttributeReference("b", ArrayType(ArrayType(IntegerType)))(), + AttributeReference("c", MapType(StringType, IntegerType))(), + AttributeReference("d", MapType(StringType, ArrayType(StringType)))(), + AttributeReference("e", StructType( + Array(StructField("a", ArrayType(IntegerType)), + StructField("b", ArrayType(ArrayType(IntegerType))))))(), + AttributeReference("f", StructType( + Array(StructField("a", ArrayType(IntegerType)), + StructField("d", MapType(StringType, ArrayType(StringType))))))(), + AttributeReference("g", StructType( + Array(StructField("col1", StructType( + Array(StructField("a", ArrayType(IntegerType)), + StructField("b", ArrayType(ArrayType(IntegerType)))))), + StructField("col2", StructType( + Array(StructField("a", ArrayType(IntegerType)), + StructField("d", MapType(StringType, ArrayType(StringType)))))))))()), + child = child, + ioschema = defaultIOSchema + ), + df.select('a, 'b, 'c, 'd, 'e, 'f, 'g).collect()) + } + } + test("SPARK-33934: Add SparkFile's root dir to env property PATH") { assume(TestUtils.testCommandAvailable("python")) val scriptFilePath = copyAndGetResourceFile("test_script.py", ".py").getAbsoluteFile diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala index 6ff7c5d6d2f3a..e5aa3bfacd9ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import org.apache.spark.{SparkException, TestUtils} +import org.apache.spark.TestUtils import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.test.SharedSparkSession @@ -59,44 +59,4 @@ class SparkScriptTransformationSuite extends BaseScriptTransformationSuite with assert(e.contains("TRANSFORM with serde is only supported in hive mode")) } } - - test("SPARK-32106: TRANSFORM doesn't support ArrayType/MapType/StructType " + - "as output data type (no serde)") { - assume(TestUtils.testCommandAvailable("/bin/bash")) - // check for ArrayType - val e1 = intercept[SparkException] { - sql( - """ - |SELECT TRANSFORM(a) - |USING 'cat' AS (a array) - |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c) - """.stripMargin).collect() - }.getMessage - assert(e1.contains("SparkScriptTransformation without serde does not support" + - " ArrayType as output data type")) - - // check for MapType - val e2 = intercept[SparkException] { - sql( - """ - |SELECT TRANSFORM(b) - |USING 'cat' AS (b map) - |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c) - """.stripMargin).collect() - }.getMessage - assert(e2.contains("SparkScriptTransformation without serde does not support" + - " MapType as output data type")) - - // check for StructType - val e3 = intercept[SparkException] { - sql( - """ - |SELECT TRANSFORM(c) - |USING 'cat' AS (c struct) - |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c) - """.stripMargin).collect() - }.getMessage - assert(e3.contains("SparkScriptTransformation without serde does not support" + - " StructType as output data type")) - } }