-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48743][SQL][SS] MergingSessionIterator should better handle when getStruct returns null #47134
[SPARK-48743][SQL][SS] MergingSessionIterator should better handle when getStruct returns null #47134
Conversation
cc @HeartSaVioR @sigmod PTAL! Thank you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only minor comment.
@@ -118,7 +118,9 @@ class MergingSessionsIterator( | |||
val inputRow = inputIterator.next() | |||
nextGroupingKey = groupingWithoutSessionProjection(inputRow).copy() | |||
val session = sessionProjection(inputRow) | |||
nextGroupingSession = session.getStruct(0, 2).copy() | |||
val groupingSession = session.getStruct(0, 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We have another place to have the same possibility of NPE, processCurrentSortedGroup(). We may want to extract the logic to apply the same to both places.
- I guess it's OK to set errorOnIterator = true and throw internalError "here" instead of wrapping the call of initialize() with try-catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another minor.
@@ -118,7 +118,11 @@ class MergingSessionsIterator( | |||
val inputRow = inputIterator.next() | |||
nextGroupingKey = groupingWithoutSessionProjection(inputRow).copy() | |||
val session = sessionProjection(inputRow) | |||
nextGroupingSession = session.getStruct(0, 2).copy() | |||
val groupingSession = session.getStruct(0, 2) | |||
if (groupingSession == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's be conservative, errorOnIterator = true
to invalidate the iterator. Please apply the same to below as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah thanks! addressed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 pending CI
Merged to master. |
…en getStruct returns null ### What changes were proposed in this pull request? The getStruct() method used in `MergingSessionIterator.initialize` could return a null value. When that happens, the copy() called upon it throws a NullPointerException. We see an exception thrown there: ``` ava.lang.NullPointerException: <Redacted Exception Message> at org.apache.spark.sql.execution.aggregate.MergingSessionsIterator.initialize(MergingSessionsIterator.scala:121) at org.apache.spark.sql.execution.aggregate.MergingSessionsIterator.<init>(MergingSessionsIterator.scala:130) at org.apache.spark.sql.execution.aggregate.MergingSessionsExec.$anonfun$doExecute$1(MergingSessionsExec.scala:93) at org.apache.spark.sql.execution.aggregate.MergingSessionsExec.$anonfun$doExecute$1$adapted(MergingSessionsExec.scala:72) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:920) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:920) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:373) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:373) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:373) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:373) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:373) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:189) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:154) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:148) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:101) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:984) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:987) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:879) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` It is still not clear why that field could be null, but in general Spark should not throw NPEs. So this PR purposes to wrap it with SparkException.internalError with more details. ### Why are the changes needed? Improvemtns ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This is a hard-to repro issue. The change should not cause any harm. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47134 from WweiL/SPARK-48743-mergingSessionIterator-null-init. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…en getStruct returns null ### What changes were proposed in this pull request? The getStruct() method used in `MergingSessionIterator.initialize` could return a null value. When that happens, the copy() called upon it throws a NullPointerException. We see an exception thrown there: ``` ava.lang.NullPointerException: <Redacted Exception Message> at org.apache.spark.sql.execution.aggregate.MergingSessionsIterator.initialize(MergingSessionsIterator.scala:121) at org.apache.spark.sql.execution.aggregate.MergingSessionsIterator.<init>(MergingSessionsIterator.scala:130) at org.apache.spark.sql.execution.aggregate.MergingSessionsExec.$anonfun$doExecute$1(MergingSessionsExec.scala:93) at org.apache.spark.sql.execution.aggregate.MergingSessionsExec.$anonfun$doExecute$1$adapted(MergingSessionsExec.scala:72) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:920) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:920) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:373) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:373) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:373) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:373) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:373) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:189) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:154) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:148) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:101) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:984) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:987) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:879) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` It is still not clear why that field could be null, but in general Spark should not throw NPEs. So this PR purposes to wrap it with SparkException.internalError with more details. ### Why are the changes needed? Improvemtns ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This is a hard-to repro issue. The change should not cause any harm. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47134 from WweiL/SPARK-48743-mergingSessionIterator-null-init. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…en getStruct returns null ### What changes were proposed in this pull request? The getStruct() method used in `MergingSessionIterator.initialize` could return a null value. When that happens, the copy() called upon it throws a NullPointerException. We see an exception thrown there: ``` ava.lang.NullPointerException: <Redacted Exception Message> at org.apache.spark.sql.execution.aggregate.MergingSessionsIterator.initialize(MergingSessionsIterator.scala:121) at org.apache.spark.sql.execution.aggregate.MergingSessionsIterator.<init>(MergingSessionsIterator.scala:130) at org.apache.spark.sql.execution.aggregate.MergingSessionsExec.$anonfun$doExecute$1(MergingSessionsExec.scala:93) at org.apache.spark.sql.execution.aggregate.MergingSessionsExec.$anonfun$doExecute$1$adapted(MergingSessionsExec.scala:72) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:920) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:920) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:373) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:373) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:373) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:373) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:373) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:189) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:154) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:148) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:101) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:984) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:987) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:879) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` It is still not clear why that field could be null, but in general Spark should not throw NPEs. So this PR purposes to wrap it with SparkException.internalError with more details. ### Why are the changes needed? Improvemtns ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This is a hard-to repro issue. The change should not cause any harm. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47134 from WweiL/SPARK-48743-mergingSessionIterator-null-init. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
The getStruct() method used in
MergingSessionIterator.initialize
could return a null value. When that happens, the copy() called upon it throws a NullPointerException.We see an exception thrown there:
It is still not clear why that field could be null, but in general Spark should not throw NPEs. So this PR purposes to wrap it with SparkException.internalError with more details.
Why are the changes needed?
Improvemtns
Does this PR introduce any user-facing change?
No
How was this patch tested?
This is a hard-to repro issue. The change should not cause any harm.
Was this patch authored or co-authored using generative AI tooling?
No