diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala index 2fc7deb0858af..12b6934f58c8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -188,7 +188,8 @@ trait BaseScriptTransformationExec extends UnaryExecNode { case StringType => wrapperConvertException(data => data, converter) case BooleanType => wrapperConvertException(data => data.toBoolean, converter) case ByteType => wrapperConvertException(data => data.toByte, converter) - case BinaryType => wrapperConvertException(data => data.getBytes, converter) + case BinaryType => + wrapperConvertException(data => UTF8String.fromString(data).getBytes, converter) case IntegerType => wrapperConvertException(data => data.toInt, converter) case ShortType => wrapperConvertException(data => data.toShort, converter) case LongType => wrapperConvertException(data => data.toLong, converter) diff --git a/sql/core/src/test/resources/sql-tests/inputs/transform.sql b/sql/core/src/test/resources/sql-tests/inputs/transform.sql index 586df6c0cba59..222be1b836ef3 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/transform.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/transform.sql @@ -87,3 +87,60 @@ MAP a, b USING 'cat' AS (a, b) FROM t; -- transform use REDUCE REDUCE a, b USING 'cat' AS (a, b) FROM t; + +-- transform with defined row format delimit +SELECT TRANSFORM(a, b, c, null) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +USING 'cat' AS (a, b, c, d) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +FROM t; + + +SELECT TRANSFORM(a, b, c, null) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +USING 'cat' AS (d) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '||' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +FROM t; + +-- SPARK-31937 transform with defined row format delimit +--SELECT TRANSFORM(a, b, c, d, e, null) +--ROW FORMAT DELIMITED +--FIELDS TERMINATED BY '|' +--COLLECTION ITEMS TERMINATED BY '&' +--MAP KEYS TERMINATED BY '*' +--LINES TERMINATED BY '\n' +--NULL DEFINED AS 'NULL' +--USING 'cat' AS (a, b, c, d, e, f) +--ROW FORMAT DELIMITED +--FIELDS TERMINATED BY '|' +--COLLECTION ITEMS TERMINATED BY '&' +--MAP KEYS TERMINATED BY '*' +--LINES TERMINATED BY '\n' +--NULL DEFINED AS 'NULL' +--FROM VALUEW (1, 1.23, array(1,, 2, 3), map(1, '1'), struct(1, '1')) t(a, b, c, d, e); +-- +--SELECT TRANSFORM(a, b, c, d, e, null) +--ROW FORMAT DELIMITED +--FIELDS TERMINATED BY '|' +--COLLECTION ITEMS TERMINATED BY '&' +--MAP KEYS TERMINATED BY '*' +--LINES TERMINATED BY '\n' +--NULL DEFINED AS 'NULL' +--USING 'cat' AS (a) +--ROW FORMAT DELIMITED +--FIELDS TERMINATED BY '||' +--LINES TERMINATED BY '\n' +--NULL DEFINED AS 'NULL' +--FROM VALUEW (1, 1.23, array(1,, 2, 3), map(1, '1'), struct(1, '1')) t(a, b, c, d, e); diff --git a/sql/core/src/test/resources/sql-tests/results/transform.sql.out b/sql/core/src/test/resources/sql-tests/results/transform.sql.out index 37214b90b86fd..744d6384f9c45 100644 --- a/sql/core/src/test/resources/sql-tests/results/transform.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/transform.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 13 +-- Number of queries: 15 -- !query @@ -184,3 +184,41 @@ struct 1 true 2 false 3 true + + +-- !query +SELECT TRANSFORM(a, b, c, null) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +USING 'cat' AS (a, b, c, d) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +FROM t +-- !query schema +struct +-- !query output +1 | true | +2 | false | + + +-- !query +SELECT TRANSFORM(a, b, c, null) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +USING 'cat' AS (d) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '||' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +FROM t +-- !query schema +struct +-- !query output +1 +2 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala index d8fd5ea0f5bd1..101c9a5c899db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala @@ -197,23 +197,68 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU assert(uncaughtExceptionHandler.exception.isEmpty) } - test("SPARK-32106: TRANSFORM should support all data types as input (no serde)") { + def testBasicInputDataTypesWith(serde: ScriptTransformationIOSchema, testName: String): Unit = { + test(s"SPARK-32106: TRANSFORM should support basic data types as input ($testName)") { + assume(TestUtils.testCommandAvailable("python")) + withTempView("v") { + val df = Seq( + (1, "1", 1.0f, 1.0, 11.toByte, BigDecimal(1.0), new Timestamp(1), + new Date(2020, 7, 1), true), + (2, "2", 2.0f, 2.0, 22.toByte, BigDecimal(2.0), new Timestamp(2), + new Date(2020, 7, 2), true), + (3, "3", 3.0f, 3.0, 33.toByte, BigDecimal(3.0), new Timestamp(3), + new Date(2020, 7, 3), false) + ).toDF("a", "b", "c", "d", "e", "f", "g", "h", "i") + .withColumn("j", lit("abc").cast("binary")) + + checkAnswer( + df, + (child: SparkPlan) => createScriptTransformationExec( + input = Seq( + df.col("a").expr, + df.col("b").expr, + df.col("c").expr, + df.col("d").expr, + df.col("e").expr, + df.col("f").expr, + df.col("g").expr, + df.col("h").expr, + df.col("i").expr, + df.col("j").expr), + script = "cat", + output = Seq( + AttributeReference("a", IntegerType)(), + AttributeReference("b", StringType)(), + AttributeReference("c", FloatType)(), + AttributeReference("d", DoubleType)(), + AttributeReference("e", ByteType)(), + AttributeReference("f", DecimalType(38, 18))(), + AttributeReference("g", TimestampType)(), + AttributeReference("h", DateType)(), + AttributeReference("i", BooleanType)(), + AttributeReference("j", BinaryType)()), + child = child, + ioschema = serde + ), + df.select('a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, 'j).collect()) + } + } + } + + testBasicInputDataTypesWith(defaultIOSchema, "no serde") + + test("SPARK-32106: TRANSFORM should support more data types (interval, array, map, struct " + + "and udt) as input (no serde)") { assume(TestUtils.testCommandAvailable("python")) withTempView("v") { val df = Seq( - (1, "1", 1.0f, 1.0, 11.toByte, BigDecimal(1.0), new Timestamp(1), - new Date(2020, 7, 1), new CalendarInterval(7, 1, 1000), Array(0, 1, 2), - Map("a" -> 1), new TestUDT.MyDenseVector(Array(1, 2, 3)), new SimpleTuple(1, 1L)), - (2, "2", 2.0f, 2.0, 22.toByte, BigDecimal(2.0), new Timestamp(2), - new Date(2020, 7, 2), new CalendarInterval(7, 2, 2000), Array(3, 4, 5), - Map("b" -> 2), new TestUDT.MyDenseVector(Array(1, 2, 3)), new SimpleTuple(1, 1L)), - (3, "3", 3.0f, 3.0, 33.toByte, BigDecimal(3.0), new Timestamp(3), - new Date(2020, 7, 3), new CalendarInterval(7, 3, 3000), Array(6, 7, 8), - Map("c" -> 3), new TestUDT.MyDenseVector(Array(1, 2, 3)), new SimpleTuple(1, 1L)) - ).toDF("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m") - .select('a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, 'j, 'k, 'l, 'm, - struct('a, 'b).as("n"), unhex('a).as("o"), lit(true).as("p") - ) // Note column d's data type is Decimal(38, 18) + (new CalendarInterval(7, 1, 1000), Array(0, 1, 2), Map("a" -> 1), (1, 2), + new SimpleTuple(1, 1L)), + (new CalendarInterval(7, 2, 2000), Array(3, 4, 5), Map("b" -> 2), (3, 4), + new SimpleTuple(1, 1L)), + (new CalendarInterval(7, 3, 3000), Array(6, 7, 8), Map("c" -> 3), (5, 6), + new SimpleTuple(1, 1L)) + ).toDF("a", "b", "c", "d", "e") // Can't support convert script output data to ArrayType/MapType/StructType now, // return these column still as string. @@ -228,43 +273,18 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU df.col("b").expr, df.col("c").expr, df.col("d").expr, - df.col("e").expr, - df.col("f").expr, - df.col("g").expr, - df.col("h").expr, - df.col("i").expr, - df.col("j").expr, - df.col("k").expr, - df.col("l").expr, - df.col("m").expr, - df.col("n").expr, - df.col("o").expr, - df.col("p").expr), + df.col("e").expr), script = "cat", output = Seq( - AttributeReference("a", IntegerType)(), + AttributeReference("a", CalendarIntervalType)(), AttributeReference("b", StringType)(), - AttributeReference("c", FloatType)(), - AttributeReference("d", DoubleType)(), - AttributeReference("e", ByteType)(), - AttributeReference("f", DecimalType(38, 18))(), - AttributeReference("g", TimestampType)(), - AttributeReference("h", DateType)(), - AttributeReference("i", CalendarIntervalType)(), - AttributeReference("j", StringType)(), - AttributeReference("k", StringType)(), - AttributeReference("l", StringType)(), - AttributeReference("m", new SimpleTupleUDT)(), - AttributeReference("n", StringType)(), - AttributeReference("o", BinaryType)(), - AttributeReference("p", BooleanType)()), + AttributeReference("c", StringType)(), + AttributeReference("d", StringType)(), + AttributeReference("e", new SimpleTupleUDT)()), child = child, ioschema = defaultIOSchema ), - df.select( - 'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, - 'j.cast("string"), 'k.cast("string"), - 'l.cast("string"), 'm, 'n.cast("string"), 'o, 'p).collect()) + df.select('a, 'b.cast("string"), 'c.cast("string"), 'd.cast("string"), 'e).collect()) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 16e9014340244..060ab6a71af9e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -1063,6 +1063,9 @@ private[hive] trait HiveInspectors { case DateType => dateTypeInfo case TimestampType => timestampTypeInfo case NullType => voidTypeInfo + case dt => + throw new AnalysisException("TRANSFORM with hive serde does not support " + + s"${dt.getClass.getSimpleName.replace("$", "")} as input data type") } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala index 4c53f17e33b3b..078ff70cb3150 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala @@ -185,6 +185,8 @@ class HiveScriptTransformationSuite extends BaseScriptTransformationSuite with T } } + testBasicInputDataTypesWith(hiveIOSchema, "hive serde") + test("SPARK-32106: TRANSFORM supports complex data types type (hive serde)") { assume(TestUtils.testCommandAvailable("/bin/bash")) withTempView("v") { @@ -258,8 +260,9 @@ class HiveScriptTransformationSuite extends BaseScriptTransformationSuite with T child = df.queryExecution.sparkPlan, ioschema = hiveIOSchema) SparkPlanTest.executePlan(plan, hiveContext) - } - assert(e1.getMessage.contains("scala.MatchError: CalendarIntervalType")) + }.getMessage + assert(e1.contains( + "TRANSFORM with hive serde does not support CalendarIntervalType as input data type")) val e2 = intercept[SparkException] { val plan = createScriptTransformationExec( @@ -271,9 +274,9 @@ class HiveScriptTransformationSuite extends BaseScriptTransformationSuite with T child = df.queryExecution.sparkPlan, ioschema = hiveIOSchema) SparkPlanTest.executePlan(plan, hiveContext) - } - assert(e2.getMessage.contains( - "scala.MatchError: org.apache.spark.sql.types.TestUDT$MyDenseVectorUDT")) + }.getMessage + assert(e2.contains( + "TRANSFORM with hive serde does not support MyDenseVectorUDT as input data type")) } } @@ -293,8 +296,9 @@ class HiveScriptTransformationSuite extends BaseScriptTransformationSuite with T |SELECT TRANSFORM(a, b) USING 'cat' AS (a, b) |FROM v """.stripMargin).collect() - } - assert(e1.getMessage.contains("scala.MatchError: CalendarIntervalType")) + }.getMessage + assert(e1.contains( + "TRANSFORM with hive serde does not support CalendarIntervalType as input data type")) val e2 = intercept[SparkException] { sql( @@ -302,9 +306,9 @@ class HiveScriptTransformationSuite extends BaseScriptTransformationSuite with T |SELECT TRANSFORM(a, c) USING 'cat' AS (a, c) |FROM v """.stripMargin).collect() - } - assert(e2.getMessage.contains( - "scala.MatchError: org.apache.spark.sql.types.TestUDT$MyDenseVectorUDT")) + }.getMessage + assert(e2.contains( + "TRANSFORM with hive serde does not support MyDenseVectorUDT as input data type")) } } }