Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31937][SQL] Support processing ArrayType/MapType/StructType data using no-serde mode script transform #30957

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
adc9ded
[SPARK-31937][SQL] Support processing array and map type using spark …
AngersZhuuuu Dec 29, 2020
6a7438b
Update CatalystTypeConverters.scala
AngersZhuuuu Dec 29, 2020
d3b9cec
fix failed UT
AngersZhuuuu Dec 29, 2020
fdd5225
Update SparkScriptTransformationSuite.scala
AngersZhuuuu Dec 29, 2020
aa16c8f
Update BaseScriptTransformationSuite.scala
AngersZhuuuu Dec 29, 2020
092c927
Update BaseScriptTransformationExec.scala
AngersZhuuuu Dec 29, 2020
9761c0e
Merge branch 'master' into SPARK-31937
AngersZhuuuu Dec 29, 2020
28ad7fa
Update BaseScriptTransformationSuite.scala
AngersZhuuuu Dec 29, 2020
9ac75fc
Merge branch 'master' into SPARK-31937
AngersZhuuuu Jan 4, 2021
33d8b5b
Update BaseScriptTransformationExec.scala
AngersZhuuuu Jan 4, 2021
63f07eb
follow comment
AngersZhuuuu Feb 4, 2021
b631b70
Update BaseScriptTransformationExec.scala
AngersZhuuuu Feb 4, 2021
b7e7f92
follow comment
AngersZhuuuu Feb 5, 2021
8dec5a1
follow comment
AngersZhuuuu Feb 5, 2021
529d54d
Update BaseScriptTransformationExec.scala
AngersZhuuuu Feb 6, 2021
4f0e78f
Avoid construct JsonToStructs repeated
AngersZhuuuu Feb 6, 2021
ed8c54c
remove unused UT
AngersZhuuuu Feb 6, 2021
520f4b8
Update sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScr…
AngersZhuuuu Apr 16, 2021
97f9d58
Merge branch 'master' into SPARK-31937
AngersZhuuuu Apr 16, 2021
b5a4268
[SPARK-35097][SQL] Add column name to SparkUpgradeException about anc…
AngersZhuuuu Apr 18, 2021
76a746e
Revert "[SPARK-35097][SQL] Add column name to SparkUpgradeException a…
AngersZhuuuu Apr 18, 2021
6aa05fc
fix UT
AngersZhuuuu Apr 19, 2021
9e3f808
Revert "fix UT"
AngersZhuuuu Apr 19, 2021
3f51d27
fix UT
AngersZhuuuu Apr 19, 2021
adf8a66
Update sql-migration-guide.md
AngersZhuuuu Apr 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.{SparkException, SparkFiles, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Cast, Expression, GenericInternalRow, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Cast, Expression, GenericInternalRow, JsonToStructs, Literal, StructsToJson, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
Expand All @@ -47,7 +47,14 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
def ioschema: ScriptTransformationIOSchema

protected lazy val inputExpressionsWithoutSerde: Seq[Expression] = {
input.map(Cast(_, StringType).withTimeZone(conf.sessionLocalTimeZone))
input.map { in =>
AngersZhuuuu marked this conversation as resolved.
Show resolved Hide resolved
in.dataType match {
case _: ArrayType | _: MapType | _: StructType =>
new StructsToJson(ioschema.inputSerdeProps.toMap, in)
.withTimeZone(conf.sessionLocalTimeZone)
case _ => Cast(in, StringType).withTimeZone(conf.sessionLocalTimeZone)
}
}
}

override def producedAttributes: AttributeSet = outputSet -- inputSet
Expand Down Expand Up @@ -220,6 +227,11 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
case CalendarIntervalType => wrapperConvertException(
data => IntervalUtils.stringToInterval(UTF8String.fromString(data)),
converter)
case _: ArrayType | _: MapType | _: StructType =>
val complexTypeFactory = JsonToStructs(attr.dataType,
ioschema.outputSerdeProps.toMap, Literal(null), Some(conf.sessionLocalTimeZone))
AngersZhuuuu marked this conversation as resolved.
Show resolved Hide resolved
wrapperConvertException(data =>
complexTypeFactory.nullSafeEval(UTF8String.fromString(data)), any => any)
case udt: UserDefinedType[_] =>
wrapperConvertException(data => udt.deserialize(data), converter)
case dt =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,16 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
script = "cat",
output = Seq(
AttributeReference("a", CalendarIntervalType)(),
AttributeReference("b", StringType)(),
AttributeReference("c", StringType)(),
AttributeReference("d", StringType)(),
AttributeReference("b", ArrayType(IntegerType))(),
AttributeReference("c", MapType(StringType, IntegerType))(),
AttributeReference("d", StructType(
Array(StructField("_1", IntegerType),
StructField("_2", IntegerType))))(),
AttributeReference("e", new SimpleTupleUDT)()),
child = child,
ioschema = defaultIOSchema
),
df.select('a, 'b.cast("string"), 'c.cast("string"), 'd.cast("string"), 'e).collect())
df.select('a, 'b, 'c, 'd, 'e).collect())
}
}

Expand Down Expand Up @@ -471,6 +473,60 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
}
}

test("SPARK-31936: Script transform support ArrayType/MapType/StructType (no serde)") {
assume(TestUtils.testCommandAvailable("python"))
withTempView("v") {
val df = Seq(
(Array(0, 1, 2), Array(Array(0, 1), Array(2)),
Map("a" -> 1), Map("b" -> Array("a", "b"))),
(Array(3, 4, 5), Array(Array(3, 4), Array(5)),
Map("b" -> 2), Map("c" -> Array("c", "d"))),
(Array(6, 7, 8), Array(Array(6, 7), Array(8)),
Map("c" -> 3), Map("d" -> Array("e", "f")))
).toDF("a", "b", "c", "d")
.select('a, 'b, 'c, 'd,
struct('a, 'b).as("e"),
struct('a, 'd).as("f"),
struct(struct('a, 'b), struct('a, 'd)).as("g")
)

checkAnswer(
df,
(child: SparkPlan) => createScriptTransformationExec(
input = Seq(
df.col("a").expr,
df.col("b").expr,
df.col("c").expr,
df.col("d").expr,
df.col("e").expr,
df.col("f").expr,
df.col("g").expr),
script = "cat",
output = Seq(
AttributeReference("a", ArrayType(IntegerType))(),
AttributeReference("b", ArrayType(ArrayType(IntegerType)))(),
AttributeReference("c", MapType(StringType, IntegerType))(),
AttributeReference("d", MapType(StringType, ArrayType(StringType)))(),
AttributeReference("e", StructType(
Array(StructField("a", ArrayType(IntegerType)),
StructField("b", ArrayType(ArrayType(IntegerType))))))(),
AttributeReference("f", StructType(
Array(StructField("a", ArrayType(IntegerType)),
StructField("d", MapType(StringType, ArrayType(StringType))))))(),
AttributeReference("g", StructType(
Array(StructField("col1", StructType(
Array(StructField("a", ArrayType(IntegerType)),
StructField("b", ArrayType(ArrayType(IntegerType)))))),
StructField("col2", StructType(
Array(StructField("a", ArrayType(IntegerType)),
StructField("d", MapType(StringType, ArrayType(StringType)))))))))()),
child = child,
ioschema = defaultIOSchema
),
df.select('a, 'b, 'c, 'd, 'e, 'f, 'g).collect())
}
}

test("SPARK-33934: Add SparkFile's root dir to env property PATH") {
assume(TestUtils.testCommandAvailable("python"))
val scriptFilePath = copyAndGetResourceFile("test_script.py", ".py").getAbsoluteFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.TestUtils
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -59,44 +59,4 @@ class SparkScriptTransformationSuite extends BaseScriptTransformationSuite with
assert(e.contains("TRANSFORM with serde is only supported in hive mode"))
}
}

test("SPARK-32106: TRANSFORM doesn't support ArrayType/MapType/StructType " +
"as output data type (no serde)") {
assume(TestUtils.testCommandAvailable("/bin/bash"))
// check for ArrayType
val e1 = intercept[SparkException] {
sql(
"""
|SELECT TRANSFORM(a)
|USING 'cat' AS (a array<int>)
|FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c)
""".stripMargin).collect()
}.getMessage
assert(e1.contains("SparkScriptTransformation without serde does not support" +
" ArrayType as output data type"))

// check for MapType
val e2 = intercept[SparkException] {
sql(
"""
|SELECT TRANSFORM(b)
|USING 'cat' AS (b map<int, string>)
|FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c)
""".stripMargin).collect()
}.getMessage
assert(e2.contains("SparkScriptTransformation without serde does not support" +
" MapType as output data type"))

// check for StructType
val e3 = intercept[SparkException] {
sql(
"""
|SELECT TRANSFORM(c)
|USING 'cat' AS (c struct<col1:int, col2:string>)
|FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c)
""".stripMargin).collect()
}.getMessage
assert(e3.contains("SparkScriptTransformation without serde does not support" +
" StructType as output data type"))
}
}