Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2403] Catch all errors during serialization in DAGScheduler #1329

Closed
wants to merge 3 commits into from
Closed

[SPARK-2403] Catch all errors during serialization in DAGScheduler #1329

wants to merge 3 commits into from

Conversation

darabos
Copy link
Contributor

@darabos darabos commented Jul 8, 2014

https://issues.apache.org/jira/browse/SPARK-2403

Spark hangs for us whenever we forget to register a class with Kryo. This should be a simple fix for that. But let me know if you have a better suggestion.

I did not write a new test for this. It would be pretty complicated and I'm not sure it's worthwhile for such a simple change. Let me know if you disagree.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@@ -768,6 +768,10 @@ class DAGScheduler(
abortStage(stage, "Task not serializable: " + e.toString)
runningStages -= stage
return
case e: Throwable => // Other exceptions, such as IllegalArgumentException from Kryo.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please catch NonFatal(e) instead. I think we should catch StackOverflowError here (as that is a possible error during serialization), but we should not catch OOMs and other such throwables except to re-throw them.

NB: Despite what the documentation says, NonFatal does indeed seem to catch StackOverflowError:

scala> NonFatal(new StackOverflowError())
res1: Boolean = true

scala> NonFatal(new OutOfMemoryError())
res2: Boolean = false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect you are testing this on 2.10. Looks like a change in 2.11:

scala/scala@6460365#diff-ff42321ce198f97308744271b7e17c76

I think their argument applies to Spark too. Sounds like it is not safe to try and recover from StackOverflowError.

Thanks for the comments! I'll update the pull request in a moment.

@aarondav
Copy link
Contributor

aarondav commented Jul 8, 2014

Jenkins, ok to test.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@darabos
Copy link
Contributor Author

darabos commented Jul 8, 2014

Thanks! I've added the suggested changes.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@aarondav
Copy link
Contributor

aarondav commented Jul 8, 2014

LGTM. Regarding the initial problem you observed, did you see the actual exception via the DAGScheduler's OneForOneStrategy failure? Or were there no log messages containing the error?

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16408/

@darabos
Copy link
Contributor Author

darabos commented Jul 8, 2014

LGTM. Regarding the initial problem you observed, did you see the actual exception via the DAGScheduler's OneForOneStrategy failure? Or were there no log messages containing the error?

Yes, the exception was logged from OneForOneStrategy. See the stack trace in https://issues.apache.org/jira/browse/SPARK-2403. (Well, except I omitted the first line which names OneForOneStrategy. Sorry about that.)

But after logging that, the system stalled. localhost:4040 was refusing connections and jstack showed just a number of waiting threads. (I've left work now, but I can paste more details tomorrow if you're interested.)

@aarondav
Copy link
Contributor

aarondav commented Jul 8, 2014

Great, thanks! I just wanted to make sure it was actually printed somewhere, although I understand the behavior was not ideal.

@aarondav
Copy link
Contributor

aarondav commented Jul 8, 2014

Merged into master and branch-1.0. Thanks!

@asfgit asfgit closed this in c8a2313 Jul 8, 2014
asfgit pushed a commit that referenced this pull request Jul 8, 2014
https://issues.apache.org/jira/browse/SPARK-2403

Spark hangs for us whenever we forget to register a class with Kryo. This should be a simple fix for that. But let me know if you have a better suggestion.

I did not write a new test for this. It would be pretty complicated and I'm not sure it's worthwhile for such a simple change. Let me know if you disagree.

Author: Daniel Darabos <darabos.daniel@gmail.com>

Closes #1329 from darabos/spark-2403 and squashes the following commits:

3aceaad [Daniel Darabos] Print full stack trace for miscellaneous exceptions during serialization.
52c22ba [Daniel Darabos] Only catch NonFatal exceptions.
361e962 [Daniel Darabos] Catch all errors during serialization in DAGScheduler.

(cherry picked from commit c8a2313)
Signed-off-by: Aaron Davidson <aaron@databricks.com>
@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16409/

gzm55 pushed a commit to MediaV/spark that referenced this pull request Jul 18, 2014
https://issues.apache.org/jira/browse/SPARK-2403

Spark hangs for us whenever we forget to register a class with Kryo. This should be a simple fix for that. But let me know if you have a better suggestion.

I did not write a new test for this. It would be pretty complicated and I'm not sure it's worthwhile for such a simple change. Let me know if you disagree.

Author: Daniel Darabos <darabos.daniel@gmail.com>

Closes apache#1329 from darabos/spark-2403 and squashes the following commits:

3aceaad [Daniel Darabos] Print full stack trace for miscellaneous exceptions during serialization.
52c22ba [Daniel Darabos] Only catch NonFatal exceptions.
361e962 [Daniel Darabos] Catch all errors during serialization in DAGScheduler.
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
https://issues.apache.org/jira/browse/SPARK-2403

Spark hangs for us whenever we forget to register a class with Kryo. This should be a simple fix for that. But let me know if you have a better suggestion.

I did not write a new test for this. It would be pretty complicated and I'm not sure it's worthwhile for such a simple change. Let me know if you disagree.

Author: Daniel Darabos <darabos.daniel@gmail.com>

Closes apache#1329 from darabos/spark-2403 and squashes the following commits:

3aceaad [Daniel Darabos] Print full stack trace for miscellaneous exceptions during serialization.
52c22ba [Daniel Darabos] Only catch NonFatal exceptions.
361e962 [Daniel Darabos] Catch all errors during serialization in DAGScheduler.
maropu pushed a commit that referenced this pull request Nov 17, 2020
…espect to aliases to avoid unneeded exchange/sort nodes

### What changes were proposed in this pull request?
This pull request tries to remove unneeded exchanges/sorts by normalizing the output partitioning and sortorder information correctly with respect to aliases.

Example: consider this join of three tables:

     |SELECT t2id, t3.id as t3id
     |FROM (
     |    SELECT t1.id as t1id, t2.id as t2id
     |    FROM t1, t2
     |    WHERE t1.id = t2.id
     |) t12, t3
     |WHERE t1id = t3.id

The plan for this looks like:

      *(9) Project [t2id#1034L, id#1004L AS t3id#1035L]
      +- *(9) SortMergeJoin [t1id#1033L], [id#1004L], Inner
         :- *(6) Sort [t1id#1033L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(t1id#1033L, 5), true, [id=#1343]   <------------------------------
         :     +- *(5) Project [id#996L AS t1id#1033L, id#1000L AS t2id#1034L]
         :        +- *(5) SortMergeJoin [id#996L], [id#1000L], Inner
         :           :- *(2) Sort [id#996L ASC NULLS FIRST], false, 0
         :           :  +- Exchange hashpartitioning(id#996L, 5), true, [id=#1329]
         :           :     +- *(1) Range (0, 10, step=1, splits=2)
         :           +- *(4) Sort [id#1000L ASC NULLS FIRST], false, 0
         :              +- Exchange hashpartitioning(id#1000L, 5), true, [id=#1335]
         :                 +- *(3) Range (0, 20, step=1, splits=2)
         +- *(8) Sort [id#1004L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id#1004L, 5), true, [id=#1349]
               +- *(7) Range (0, 30, step=1, splits=2)

In this plan, the marked exchange could have been avoided as the data is already partitioned on "t1.id". This happens because AliasAwareOutputPartitioning class handles aliases only related to HashPartitioning. This change normalizes all output partitioning based on aliasing happening in Project.

### Why are the changes needed?
To remove unneeded exchanges.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New UT added.

On TPCDS 1000 scale, this change improves the performance of query 95 from 330 seconds to 170 seconds by removing the extra Exchange.

Closes #30300 from prakharjain09/SPARK-33399-outputpartitioning.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
wangyum pushed a commit that referenced this pull request May 26, 2023
…rtitioning and sortorder with respect to aliases to avoid unneeded exchange/sort nodes (#1092)

* [SPARK-31078][SQL] Respect aliases in output ordering

Currently, in the following scenario, an unnecessary `Sort` node is introduced:
```scala
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
  val df = (0 until 20).toDF("i").as("df")
  df.repartition(8, df("i")).write.format("parquet")
    .bucketBy(8, "i").sortBy("i").saveAsTable("t")
  val t1 = spark.table("t")
  val t2 = t1.selectExpr("i as ii")
  t1.join(t2, t1("i") === t2("ii")).explain
}
```
```
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
:  +- *(1) Filter isnotnull(i#8)
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [ii#10 ASC NULLS FIRST], false, 0    <==== UNNECESSARY
   +- *(2) Project [i#8 AS ii#10]
      +- *(2) Filter isnotnull(i#8)
         +- *(2) ColumnarToRow
            +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```
Notice that `Sort [ii#10 ASC NULLS FIRST], false, 0` is introduced even though the underlying data is already sorted. This is because `outputOrdering` doesn't handle aliases correctly. This PR proposes to fix this issue.

To better handle aliases in `outputOrdering`.

Yes, now with the fix, the `explain` prints out the following:
```
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
:  +- *(1) Filter isnotnull(i#8)
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Project [i#8 AS ii#10]
   +- *(2) Filter isnotnull(i#8)
      +- *(2) ColumnarToRow
         +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```

Tests added.

Closes #27842 from imback82/alias_aware_sort_order.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-33399][SQL] Normalize output partitioning and sortorder with respect to aliases to avoid unneeded exchange/sort nodes

This pull request tries to remove unneeded exchanges/sorts by normalizing the output partitioning and sortorder information correctly with respect to aliases.

Example: consider this join of three tables:

     |SELECT t2id, t3.id as t3id
     |FROM (
     |    SELECT t1.id as t1id, t2.id as t2id
     |    FROM t1, t2
     |    WHERE t1.id = t2.id
     |) t12, t3
     |WHERE t1id = t3.id

The plan for this looks like:

      *(9) Project [t2id#1034L, id#1004L AS t3id#1035L]
      +- *(9) SortMergeJoin [t1id#1033L], [id#1004L], Inner
         :- *(6) Sort [t1id#1033L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(t1id#1033L, 5), true, [id=#1343]   <------------------------------
         :     +- *(5) Project [id#996L AS t1id#1033L, id#1000L AS t2id#1034L]
         :        +- *(5) SortMergeJoin [id#996L], [id#1000L], Inner
         :           :- *(2) Sort [id#996L ASC NULLS FIRST], false, 0
         :           :  +- Exchange hashpartitioning(id#996L, 5), true, [id=#1329]
         :           :     +- *(1) Range (0, 10, step=1, splits=2)
         :           +- *(4) Sort [id#1000L ASC NULLS FIRST], false, 0
         :              +- Exchange hashpartitioning(id#1000L, 5), true, [id=#1335]
         :                 +- *(3) Range (0, 20, step=1, splits=2)
         +- *(8) Sort [id#1004L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id#1004L, 5), true, [id=#1349]
               +- *(7) Range (0, 30, step=1, splits=2)

In this plan, the marked exchange could have been avoided as the data is already partitioned on "t1.id". This happens because AliasAwareOutputPartitioning class handles aliases only related to HashPartitioning. This change normalizes all output partitioning based on aliasing happening in Project.

To remove unneeded exchanges.

No

New UT added.

On TPCDS 1000 scale, this change improves the performance of query 95 from 330 seconds to 170 seconds by removing the extra Exchange.

Closes #30300 from prakharjain09/SPARK-33399-outputpartitioning.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>

* [CARMEL-6306] Fix ut

* [CARMEL-6306] Fix alias not compatible with ebay skew implementation

Co-authored-by: Terry Kim <yuminkim@gmail.com>
Co-authored-by: Prakhar Jain <prakharjain09@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants