-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core #29085
Conversation
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala
Outdated
Show resolved
Hide resolved
can you explain the serde part? How can we do script transformation in sql/core without the hive serde lib? |
In most case, we won't use script with serde, so we can implement script transform in sql/core first only with default format. In our product, we don't have transform script with serde. |
@alfozan Hi, alfozan, I know that in facebook using script transform a lot, in your case, do you will use script transform with serde? |
/** | ||
* The wrapper class of Spark script transformation input and output schema properties | ||
*/ | ||
case class SparkScriptIOSchema ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this class so big while it doesn't support hive serde?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this class so big while it doesn't support hive serde?
For this , I think we should change this after decide if need to implement serde in script of sql/core
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation here offers a very limited support for ROW FORMAT DELIMITED
format - it does not rely on a Hive's SerDe class.
A complete implementation (SerDes class for ROW FORMAT DELIMITED) can be added later and will live in the same folder.
#29085 (comment)
At least we should define how to convert catalyst values to strings, right? |
So we need to handle string format in |
Test build #125768 has finished for PR 29085 at commit
|
Test build #125775 has finished for PR 29085 at commit
|
Can we use |
Nice advise! Updated |
@AngersZhuuuu Yes, we implemented two native SerDe classes ( For more: see https://www.slideshare.net/databricks/powering-custom-apps-at-facebook-using-spark-script-transformation slide 34 |
|
||
import spark.implicits._ | ||
|
||
var noSerdeIOSchema: BaseScriptTransformIOSchema = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make it a val
or def
and ask child to override it?
child: SparkPlan, | ||
ioschema: BaseScriptTransformIOSchema): BaseScriptTransformationExec = { | ||
scriptType.toUpperCase(Locale.ROOT) match { | ||
case "SPARK" => new SparkScriptTransformationExec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of asking the child to override scriptType
, we should just ask child to implement a method to create BaseScriptTransformationExec
.
Then we can move the base suite to sql/core.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of asking the child to override
scriptType
, we should just ask child to implement a method to createBaseScriptTransformationExec
.Then we can move the base suite to sql/core.
There are inherit confit between SparkPlanTest
and SQLTestUtils
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @cloud-fan when move BaseScriptTransformationSuite to sql/core there are method conflict like below, how can I solve this is better?
[error] /Users/angerszhu/Documents/project/AngersZhu/spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala:31: overriding method spark in trait SharedSparkSessionBase of type => org.apache.spark.sql.SparkSession;
[error] value spark in trait TestHiveSingleton of type org.apache.spark.sql.SparkSession has weaker access privileges; it should be public
[error] class HiveScriptTransformationSuite extends BaseScriptTransformationSuite with TestHiveSingleton {
[error] ^
[error] on
What's the behavior of hive if the script transformation doesn't specify a serde? Does Hive pick a default serde, or it well defines the behavior of non-serde? |
Yea, that's what I want to do next, glad to hear that you will share your code. |
Test build #125827 has finished for PR 29085 at commit
|
In current code, when we don't write serde with transform, it will use LazySimpleSerde spark/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala Lines 717 to 723 in d6a68e0
it means only when you write a wrong serde, and ScriptTransformationExec can't find corresponding serde class, it will execute code about spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala Lines 236 to 238 in d6a68e0
spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala Lines 175 to 187 in d6a68e0
As this #29085 (comment) comment, we know that without serde, we can't handle input data string correctly, same reason, we can't handle output data too, so add a In this #29085 (comment) Jenkins result you can see output data type probelm |
In current pr, I just add a temporary method, and wait @alfozan 's spark's serde. As far as I know, |
so eventually we don't need to use |
Yes, but in this step, we should do this to keep data and UT right. need to |
Test build #125840 has finished for PR 29085 at commit
|
Test build #126326 has finished for PR 29085 at commit
|
Test build #126329 has finished for PR 29085 at commit
|
Test build #126333 has finished for PR 29085 at commit
|
Test build #126340 has finished for PR 29085 at commit
|
Test build #126345 has finished for PR 29085 at commit
|
@@ -744,8 +744,29 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging | |||
selectClause.hints.asScala.foldRight(withWindow)(withHints) | |||
} | |||
|
|||
// Decode and input/output format. | |||
type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Format
-> ScriptIOFormat
? Then, could you make the comment above clearer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -1031,4 +1031,96 @@ class PlanParserSuite extends AnalysisTest { | |||
assertEqual("select a, b from db.c;;;", table("db", "c").select('a, 'b)) | |||
assertEqual("select a, b from db.c; ;; ;", table("db", "c").select('a, 'b)) | |||
} | |||
|
|||
test("SPARK-32106: TRANSFORM without serde") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TRANSFORM without serde
-> TRANSFORM plan
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, could you check ROW FORMAT SERDE
, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, could you check
ROW FORMAT SERDE
, too?
Add UT
object SparkScripts extends Strategy { | ||
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { | ||
case logical.ScriptTransformation(input, script, output, child, ioschema) | ||
if ioschema.inputSerdeClass.isEmpty && ioschema.outputSerdeClass.isEmpty => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to check this here? Seems like it has been checked in https://github.com/apache/spark/pull/29085/files#diff-9847f5cef7cf7fbc5830fbc6b779ee10R783-R784 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to check this here? Seems like it has been checked in https://github.com/apache/spark/pull/29085/files#diff-9847f5cef7cf7fbc5830fbc6b779ee10R783-R784 ?
Yea, don't need now
NULL DEFINED AS 'NULL' | ||
FROM t; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove blank.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
NULL DEFINED AS 'NULL' | ||
FROM t; | ||
|
||
-- SPARK-31937 transform with defined row format delimit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This JIRA is related to this query? I read it though, I'm not sure about the relationship. What kind of exceptions does this query throws?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This JIRA is related to this query? I read it though, I'm not sure about the relationship. What kind of exceptions does this query throws?
Test for support Array/Map/Struct
Remove now and add it in that pr:
import org.apache.spark.sql.execution._ | ||
import org.apache.spark.sql.hive.HiveInspectors | ||
import org.apache.spark.sql.hive.HiveShim._ | ||
import org.apache.spark.sql.types.DataType | ||
import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils} | ||
import org.apache.spark.sql.types.{DataType, StringType} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StringType
not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
TaskContext.get(), | ||
hadoopConf | ||
) | ||
private def initSerDe( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion, but, on second thought, its better to pull out hive-serde related functions from HiveScriptTransformationExec
then create a companion object having them for readability maropu@972775b. WDTY?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion, but, on second thought, its better to pull out hive-serde related functions from
HiveScriptTransformationExec
then create a companion object having them for readability maropu@972775b. WDTY?
Agree, make ScripTransformExec only handle data process.
} | ||
|
||
test("SPARK-30973: TRANSFORM should wait for the termination of the script (with serde)") { | ||
test("SPARK-32106: TRANSFORM supports complex data types end to end (hive serde) ") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the space in the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -1063,6 +1063,9 @@ private[hive] trait HiveInspectors { | |||
case DateType => dateTypeInfo | |||
case TimestampType => timestampTypeInfo | |||
case NullType => voidTypeInfo | |||
case dt => | |||
throw new AnalysisException("TRANSFORM with hive serde does not support " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think HiveInspectors
is not related to TRANSFORM
, so could you make the error message more general?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
} | ||
|
||
test("TRANSFORM doesn't support ArrayType/MapType/StructType as output data type (no serde)") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SPARK-32106:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DOne
All done |
case udt: UserDefinedType[_] => | ||
wrapperConvertException(data => udt.deserialize(data), converter) | ||
case dt => | ||
throw new SparkException("TRANSFORM without serde does not support " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: TRANSFORM
-> s"$nodeName...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
TRANSFORM
->s"$nodeName...
Done
@@ -1063,6 +1063,9 @@ private[hive] trait HiveInspectors { | |||
case DateType => dateTypeInfo | |||
case TimestampType => timestampTypeInfo | |||
case NullType => voidTypeInfo | |||
case dt => | |||
throw new AnalysisException("HiveInspectors does not support convert " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s"${dt.catalogString}" cannot be converted to Hive TypeInfo"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
s"${dt.catalogString}" cannot be converted to Hive TypeInfo"
same reason like #29085 (comment)
wrapperConvertException(data => udt.deserialize(data), converter) | ||
case dt => | ||
throw new SparkException("TRANSFORM without serde does not support " + | ||
s"${dt.getClass.getSimpleName} as output data type") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dt.getClass.getSimpleName
-> dt.catalogString
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dt.getClass.getSimpleName
->dt.catalogString
It is not general, for ArrayType will show array<int>
, StructType will show struct<string, int, etc.. >
WDYT?
Looks almost okay now, so could you split this PR into pieces? I think its somewhat big fro reviews. For example;
WDTY? |
Yea, raise pr one by one. |
Test build #126372 has finished for PR 29085 at commit
|
Test build #126376 has finished for PR 29085 at commit
|
Test build #126373 has finished for PR 29085 at commit
|
as mentioned above (adding Spark native SerDes), I'll open a PR once part 1 is merged |
retest this please |
Test build #126818 has finished for PR 29085 at commit
|
# What changes were proposed in this pull request? This PR comes from the comment: #29085 (comment) - Extract common Script IOSchema `ScriptTransformationIOSchema` - avoid repeated judgement extract process output row method `createOutputIteratorWithoutSerde` && `createOutputIteratorWithSerde` - add default no serde IO schemas `ScriptTransformationIOSchema.defaultIOSchema` ### Why are the changes needed? Refactor code ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? NO Closes #29199 from AngersZhuuuu/spark-32105-followup. Authored-by: angerszhu <angers.zhu@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cc @wangyum too |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
### What changes were proposed in this pull request? * Implement `SparkScriptTransformationExec` based on `BaseScriptTransformationExec` * Implement `SparkScriptTransformationWriterThread` based on `BaseScriptTransformationWriterThread` of writing data * Add rule `SparkScripts` to support convert script LogicalPlan to SparkPlan in Spark SQL (without hive mode) * Add `SparkScriptTransformationSuite` test spark spec case * add test in `SQLQueryTestSuite` And we will close #29085 . ### Why are the changes needed? Support user use Script Transform without Hive ### Does this PR introduce _any_ user-facing change? User can use Script Transformation without hive in no serde mode. Such as : **default no serde ** ``` SELECT TRANSFORM(a, b, c) USING 'cat' AS (a int, b string, c long) FROM testData ``` **no serde with spec ROW FORMAT DELIMITED** ``` SELECT TRANSFORM(a, b, c) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY '\u0002' MAP KEYS TERMINATED BY '\u0003' LINES TERMINATED BY '\n' NULL DEFINED AS 'null' USING 'cat' AS (a, b, c) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY '\u0004' MAP KEYS TERMINATED BY '\u0005' LINES TERMINATED BY '\n' NULL DEFINED AS 'NULL' FROM testData ``` ### How was this patch tested? Added UT Closes #29414 from AngersZhuuuu/SPARK-32106-MINOR. Authored-by: angerszhu <angers.zhu@gmail.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
What changes were proposed in this pull request?
Implement script transformation in sql/core module
hive/execution/ScriptTransformationExec
tohive/execution/HiveScriptTransformationExec
(rename file)SparkScriptTransformationExec
based onBaseScriptTransformationExec
SparkScriptTransformationWriterThread
based onBaseScriptTransformationWriterThread
of writing dataSparkScripts
to support convert script LogicalPlan to SparkPlan in Spark SQL (without hive mode)BaseScriptTransformationSuite
SparkScriptTransformationSuite
In these PR, here offers a very limited support for ROW FORMAT DELIMITED format - it does not rely on a Hive's SerDe class.:
String
we cast data as string and pass to script, always in script we process data as string.For current ScriptTrasformation with default serde, we can't support complext datatype (Map/Array/Struct) and
spark's special datatype (TimestampType/CalanderIntervalType) too, in
And after this pr, @alfozan will raise a new pr to add two native SerDe classes (SimpleSerDe for ROW FORMAT DELIMITED and DelimitedJSONSerDe for the JSON variant), In that pr it will support handle Complex data type and spark's own special Type.
One more need to explain is that, in current code, the way to choose to use default or witch serde method is not the final way. it need to discusses more in next pr with spark's own serde.
Why are the changes needed?
Support run scrip in Spark
Does this PR introduce any user-facing change?
User can use script transform without hive support
How was this patch tested?
added UT