Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31854][SQL] Invoke in MapElementsExec should not propagate null #28681

Closed
wants to merge 5 commits into from

Conversation

maropu
Copy link
Member

@maropu maropu commented May 31, 2020

What changes were proposed in this pull request?

This PR intends to fix a bug of Dataset.map below when the whole-stage codegen enabled;

scala> val ds = Seq(1.asInstanceOf[Integer], null.asInstanceOf[Integer]).toDS()

scala> sql("SET spark.sql.codegen.wholeStage=true")

scala> ds.map(v=>(v,v)).explain
== Physical Plan ==
*(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1.intValue AS _1#69, assertnotnull(input[0, scala.Tuple2, true])._2.intValue AS _2#70]
+- *(1) MapElements <function1>, obj#68: scala.Tuple2
   +- *(1) DeserializeToObject staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, value#1, true, false), obj#67: java.lang.Integer
      +- LocalTableScan [value#1]

// `AssertNotNull` in `SerializeFromObject` will fail;
scala> ds.map(v => (v, v)).show()
java.lang.NullPointerException: Null value appeared in non-nullable fails:
top level Product input object
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

// When the whole-stage codegen disabled, the query works well;
scala> sql("SET spark.sql.codegen.wholeStage=false")
scala> ds.map(v=>(v,v)).show()
+----+----+
|  _1|  _2|
+----+----+
|   1|   1|
|null|null|
+----+----+

A root cause is that Invoke used in MapElementsExec propagates input null, and then AssertNotNull in SerializeFromObject fails because a top-level row becomes null. So, MapElementsExec should not return null but (null, null).

NOTE: the generated code of the query above in the current master;

/* 033 */   private void mapelements_doConsume_0(java.lang.Integer mapelements_expr_0_0, boolean mapelements_exprIsNull_0_0) throws java.io.IOException {
/* 034 */     boolean mapelements_isNull_1 = true;
/* 035 */     scala.Tuple2 mapelements_value_1 = null;
/* 036 */     if (!false) {
/* 037 */       mapelements_resultIsNull_0 = false;
/* 038 */
/* 039 */       if (!mapelements_resultIsNull_0) {
/* 040 */         mapelements_resultIsNull_0 = mapelements_exprIsNull_0_0;
/* 041 */         mapelements_mutableStateArray_0[0] = mapelements_expr_0_0;
/* 042 */       }
/* 043 */
/* 044 */       mapelements_isNull_1 = mapelements_resultIsNull_0;
/* 045 */       if (!mapelements_isNull_1) {
/* 046 */         Object mapelements_funcResult_0 = null;
/* 047 */         mapelements_funcResult_0 = ((scala.Function1) references[1] /* literal */).apply(mapelements_mutableStateArray_0[0]);
/* 048 */
/* 049 */         if (mapelements_funcResult_0 != null) {
/* 050 */           mapelements_value_1 = (scala.Tuple2) mapelements_funcResult_0;
/* 051 */         } else {
/* 052 */           mapelements_isNull_1 = true;
/* 053 */         }
/* 054 */
/* 055 */       }
/* 056 */     }
/* 057 */
/* 058 */     serializefromobject_doConsume_0(mapelements_value_1, mapelements_isNull_1);
/* 059 */
/* 060 */   }

The generated code w/ this fix;

/* 032 */   private void mapelements_doConsume_0(java.lang.Integer mapelements_expr_0_0, boolean mapelements_exprIsNull_0_0) throws java.io.IOException {
/* 033 */     boolean mapelements_isNull_1 = true;
/* 034 */     scala.Tuple2 mapelements_value_1 = null;
/* 035 */     if (!false) {
/* 036 */       mapelements_mutableStateArray_0[0] = mapelements_expr_0_0;
/* 037 */
/* 038 */       mapelements_isNull_1 = false;
/* 039 */       if (!mapelements_isNull_1) {
/* 040 */         Object mapelements_funcResult_0 = null;
/* 041 */         mapelements_funcResult_0 = ((scala.Function1) references[1] /* literal */).apply(mapelements_mutableStateArray_0[0]);
/* 042 */
/* 043 */         if (mapelements_funcResult_0 != null) {
/* 044 */           mapelements_value_1 = (scala.Tuple2) mapelements_funcResult_0;
/* 045 */           mapelements_isNull_1 = false;
/* 046 */         } else {
/* 047 */           mapelements_isNull_1 = true;
/* 048 */         }
/* 049 */
/* 050 */       }
/* 051 */     }
/* 052 */
/* 053 */     serializefromobject_doConsume_0(mapelements_value_1, mapelements_isNull_1);
/* 054 */
/* 055 */   }

Why are the changes needed?

Bugfix.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added tests.

@SparkQA
Copy link

SparkQA commented May 31, 2020

Test build #123327 has finished for PR 28681 at commit 6de753d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu changed the title [SPARK-31854][SQL] Invoke in MapElementsExec should respect propagateNull [SPARK-31854][SQL] Invoke in MapElementsExec should not propagate null May 31, 2020
@SparkQA
Copy link

SparkQA commented May 31, 2020

Test build #123339 has finished for PR 28681 at commit 8e88d73.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 31, 2020

Test build #123343 has finished for PR 28681 at commit 89eee82.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

s"${ev.value} = (${CodeGenerator.boxedType(javaType)}) $funcResult;"
s"""
${ev.value} = (${CodeGenerator.boxedType(javaType)}) $funcResult;
${ev.isNull} = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we can remove ${ev.isNull} = false;

} else {
s"""
if ($funcResult != null) {
${ev.value} = (${CodeGenerator.boxedType(javaType)}) $funcResult;
${ev.isNull} = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as this

@maropu
Copy link
Member Author

maropu commented May 31, 2020

cc: @cloud-fan @viirya

case m: MapFunction[_, _] => classOf[MapFunction[_, _]] -> "call"
case _ => FunctionUtils.getFunctionOneName(outputObjectType, child.output(0).dataType)
}
val funcObj = Literal.create(func, ObjectType(funcClass))
val callFunc = Invoke(funcObj, methodName, outputObjectType, child.output)
val callFunc = Invoke(funcObj, funcName, outputObjectType, child.output, propagateNull = false)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the name methodName->funcName so as to put the code in a single line. plz let me know if I need to revert this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks okay for that purpose.

@@ -1916,6 +1916,17 @@ class DatasetSuite extends QueryTest
assert(df1.semanticHash !== df3.semanticHash)
assert(df3.semanticHash === df4.semanticHash)
}

test("SPARK-31854: Invoke in MapElementsExec should not propagate null") {
spark.conf.set("spark.sql.codegen.wholeStage", false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need to set this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, I forgot to remove this...

Comment on lines 206 to 219
test("SPARK-31854: Invoke in MapElementsExec should not propagate null") {
val targetObject = new InvokeTargetClass
val funcClass = classOf[InvokeTargetClass]
val funcObj = Literal.create(targetObject, ObjectType(funcClass))
val inputInt = Seq(BoundReference(0, ObjectType(classOf[Any]), true))
val outputType = ObjectType(classOf[(Any, Any)])
val inputRow = InternalRow.fromSeq(Seq(null.asInstanceOf[java.lang.Integer]))
val createExpr = (propagateNull: Boolean) => {
Invoke(funcObj, "mapFunc", outputType, inputInt, propagateNull)
}
checkObjectExprEvaluation(createExpr(true), null, inputRow)
checkObjectExprEvaluation(createExpr(false), (null, null), inputRow)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test is already passed before this change, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks okay as I think we should not short circuit for null inputs (i.e. propagateNull = true) in MapElementsExec. The given function isn't necessarily returning null in all cases.

@SparkQA
Copy link

SparkQA commented Jun 1, 2020

Test build #123352 has finished for PR 28681 at commit 6008bc2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2020

Test build #123353 has finished for PR 28681 at commit d917af7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2020

Test build #123354 has finished for PR 28681 at commit fe943f6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.0!

@cloud-fan cloud-fan closed this in b806fc4 Jun 1, 2020
@maropu
Copy link
Member Author

maropu commented Jun 1, 2020

Thanks, all!

cloud-fan pushed a commit that referenced this pull request Jun 1, 2020
This PR intends to fix a bug of `Dataset.map` below when the whole-stage codegen enabled;
```
scala> val ds = Seq(1.asInstanceOf[Integer], null.asInstanceOf[Integer]).toDS()

scala> sql("SET spark.sql.codegen.wholeStage=true")

scala> ds.map(v=>(v,v)).explain
== Physical Plan ==
*(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1.intValue AS _1#69, assertnotnull(input[0, scala.Tuple2, true])._2.intValue AS _2#70]
+- *(1) MapElements <function1>, obj#68: scala.Tuple2
   +- *(1) DeserializeToObject staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, value#1, true, false), obj#67: java.lang.Integer
      +- LocalTableScan [value#1]

// `AssertNotNull` in `SerializeFromObject` will fail;
scala> ds.map(v => (v, v)).show()
java.lang.NullPointerException: Null value appeared in non-nullable fails:
top level Product input object
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

// When the whole-stage codegen disabled, the query works well;
scala> sql("SET spark.sql.codegen.wholeStage=false")
scala> ds.map(v=>(v,v)).show()
+----+----+
|  _1|  _2|
+----+----+
|   1|   1|
|null|null|
+----+----+
```
A root cause is that `Invoke` used in `MapElementsExec` propagates input null, and then [AssertNotNull](https://github.com/apache/spark/blob/1b780f364bfbb46944fe805a024bb6c32f5d2dde/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L253-L255) in `SerializeFromObject` fails because a top-level row becomes null. So, `MapElementsExec` should not return `null` but `(null, null)`.

NOTE: the generated code of the query above in the current master;
```
/* 033 */   private void mapelements_doConsume_0(java.lang.Integer mapelements_expr_0_0, boolean mapelements_exprIsNull_0_0) throws java.io.IOException {
/* 034 */     boolean mapelements_isNull_1 = true;
/* 035 */     scala.Tuple2 mapelements_value_1 = null;
/* 036 */     if (!false) {
/* 037 */       mapelements_resultIsNull_0 = false;
/* 038 */
/* 039 */       if (!mapelements_resultIsNull_0) {
/* 040 */         mapelements_resultIsNull_0 = mapelements_exprIsNull_0_0;
/* 041 */         mapelements_mutableStateArray_0[0] = mapelements_expr_0_0;
/* 042 */       }
/* 043 */
/* 044 */       mapelements_isNull_1 = mapelements_resultIsNull_0;
/* 045 */       if (!mapelements_isNull_1) {
/* 046 */         Object mapelements_funcResult_0 = null;
/* 047 */         mapelements_funcResult_0 = ((scala.Function1) references[1] /* literal */).apply(mapelements_mutableStateArray_0[0]);
/* 048 */
/* 049 */         if (mapelements_funcResult_0 != null) {
/* 050 */           mapelements_value_1 = (scala.Tuple2) mapelements_funcResult_0;
/* 051 */         } else {
/* 052 */           mapelements_isNull_1 = true;
/* 053 */         }
/* 054 */
/* 055 */       }
/* 056 */     }
/* 057 */
/* 058 */     serializefromobject_doConsume_0(mapelements_value_1, mapelements_isNull_1);
/* 059 */
/* 060 */   }
```

The generated code w/ this fix;
```
/* 032 */   private void mapelements_doConsume_0(java.lang.Integer mapelements_expr_0_0, boolean mapelements_exprIsNull_0_0) throws java.io.IOException {
/* 033 */     boolean mapelements_isNull_1 = true;
/* 034 */     scala.Tuple2 mapelements_value_1 = null;
/* 035 */     if (!false) {
/* 036 */       mapelements_mutableStateArray_0[0] = mapelements_expr_0_0;
/* 037 */
/* 038 */       mapelements_isNull_1 = false;
/* 039 */       if (!mapelements_isNull_1) {
/* 040 */         Object mapelements_funcResult_0 = null;
/* 041 */         mapelements_funcResult_0 = ((scala.Function1) references[1] /* literal */).apply(mapelements_mutableStateArray_0[0]);
/* 042 */
/* 043 */         if (mapelements_funcResult_0 != null) {
/* 044 */           mapelements_value_1 = (scala.Tuple2) mapelements_funcResult_0;
/* 045 */           mapelements_isNull_1 = false;
/* 046 */         } else {
/* 047 */           mapelements_isNull_1 = true;
/* 048 */         }
/* 049 */
/* 050 */       }
/* 051 */     }
/* 052 */
/* 053 */     serializefromobject_doConsume_0(mapelements_value_1, mapelements_isNull_1);
/* 054 */
/* 055 */   }
```

Bugfix.

No.

Added tests.

Closes #28681 from maropu/SPARK-31854.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit b806fc4)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@dongjoon-hyun
Copy link
Member

Hi, @cloud-fan . This is a bug since Apache Spark 2.0.x. Could you backport this to branch-2.4 please, too?

@cloud-fan
Copy link
Contributor

There are several conflicts in the test suite when I backport to 3.0, I believe there are more conflicts in 2.4. @maropu can you send a new PR for 2.4? thanks!

@dongjoon-hyun
Copy link
Member

Oh, got it. Thanks.

@HyukjinKwon
Copy link
Member

+1 for backporting. LGTM too

@maropu
Copy link
Member Author

maropu commented Jun 1, 2020

Sure, I will.

HyukjinKwon pushed a commit that referenced this pull request Jun 1, 2020
…e null

### What changes were proposed in this pull request?

This PR intends to fix a bug of `Dataset.map` below when the whole-stage codegen enabled;
```
scala> val ds = Seq(1.asInstanceOf[Integer], null.asInstanceOf[Integer]).toDS()

scala> sql("SET spark.sql.codegen.wholeStage=true")

scala> ds.map(v=>(v,v)).explain
== Physical Plan ==
*(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1.intValue AS _1#69, assertnotnull(input[0, scala.Tuple2, true])._2.intValue AS _2#70]
+- *(1) MapElements <function1>, obj#68: scala.Tuple2
   +- *(1) DeserializeToObject staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, value#1, true, false), obj#67: java.lang.Integer
      +- LocalTableScan [value#1]

// `AssertNotNull` in `SerializeFromObject` will fail;
scala> ds.map(v => (v, v)).show()
java.lang.NullPointerException: Null value appeared in non-nullable fails:
top level Product input object
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

// When the whole-stage codegen disabled, the query works well;
scala> sql("SET spark.sql.codegen.wholeStage=false")
scala> ds.map(v=>(v,v)).show()
+----+----+
|  _1|  _2|
+----+----+
|   1|   1|
|null|null|
+----+----+
```
A root cause is that `Invoke` used in `MapElementsExec` propagates input null, and then [AssertNotNull](https://github.com/apache/spark/blob/1b780f364bfbb46944fe805a024bb6c32f5d2dde/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L253-L255) in `SerializeFromObject` fails because a top-level row becomes null. So, `MapElementsExec` should not return `null` but `(null, null)`.

NOTE: the generated code of the query above in the current master;
```
/* 033 */   private void mapelements_doConsume_0(java.lang.Integer mapelements_expr_0_0, boolean mapelements_exprIsNull_0_0) throws java.io.IOException {
/* 034 */     boolean mapelements_isNull_1 = true;
/* 035 */     scala.Tuple2 mapelements_value_1 = null;
/* 036 */     if (!false) {
/* 037 */       mapelements_resultIsNull_0 = false;
/* 038 */
/* 039 */       if (!mapelements_resultIsNull_0) {
/* 040 */         mapelements_resultIsNull_0 = mapelements_exprIsNull_0_0;
/* 041 */         mapelements_mutableStateArray_0[0] = mapelements_expr_0_0;
/* 042 */       }
/* 043 */
/* 044 */       mapelements_isNull_1 = mapelements_resultIsNull_0;
/* 045 */       if (!mapelements_isNull_1) {
/* 046 */         Object mapelements_funcResult_0 = null;
/* 047 */         mapelements_funcResult_0 = ((scala.Function1) references[1] /* literal */).apply(mapelements_mutableStateArray_0[0]);
/* 048 */
/* 049 */         if (mapelements_funcResult_0 != null) {
/* 050 */           mapelements_value_1 = (scala.Tuple2) mapelements_funcResult_0;
/* 051 */         } else {
/* 052 */           mapelements_isNull_1 = true;
/* 053 */         }
/* 054 */
/* 055 */       }
/* 056 */     }
/* 057 */
/* 058 */     serializefromobject_doConsume_0(mapelements_value_1, mapelements_isNull_1);
/* 059 */
/* 060 */   }
```

The generated code w/ this fix;
```
/* 032 */   private void mapelements_doConsume_0(java.lang.Integer mapelements_expr_0_0, boolean mapelements_exprIsNull_0_0) throws java.io.IOException {
/* 033 */     boolean mapelements_isNull_1 = true;
/* 034 */     scala.Tuple2 mapelements_value_1 = null;
/* 035 */     if (!false) {
/* 036 */       mapelements_mutableStateArray_0[0] = mapelements_expr_0_0;
/* 037 */
/* 038 */       mapelements_isNull_1 = false;
/* 039 */       if (!mapelements_isNull_1) {
/* 040 */         Object mapelements_funcResult_0 = null;
/* 041 */         mapelements_funcResult_0 = ((scala.Function1) references[1] /* literal */).apply(mapelements_mutableStateArray_0[0]);
/* 042 */
/* 043 */         if (mapelements_funcResult_0 != null) {
/* 044 */           mapelements_value_1 = (scala.Tuple2) mapelements_funcResult_0;
/* 045 */           mapelements_isNull_1 = false;
/* 046 */         } else {
/* 047 */           mapelements_isNull_1 = true;
/* 048 */         }
/* 049 */
/* 050 */       }
/* 051 */     }
/* 052 */
/* 053 */     serializefromobject_doConsume_0(mapelements_value_1, mapelements_isNull_1);
/* 054 */
/* 055 */   }
```

This comes from #28681

### Why are the changes needed?

Bugfix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added tests.

Closes #28691 from maropu/SPARK-31854-BRANCH2.4.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants