Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Override useCommitCoordinator to false #9017

Merged
merged 2 commits into from
Nov 10, 2023

Conversation

huaxingao
Copy link
Contributor

In Spark BatchWrite.java, useCommitCoordinator is default to true. This PR overrides useCommitCoordinator to false, so we can rewrite a task after it fails. Here is the issue

@github-actions github-actions bot added the spark label Nov 9, 2023
@aokolnychyi
Copy link
Contributor

aokolnychyi commented Nov 9, 2023

@huaxingao, shall we also override these places?

SparkShufflingDataRewriter
SparkPositionDeltaWrite

@huaxingao
Copy link
Contributor Author

@aokolnychyi

I just override useCommitCoordinator in PositionDeltaBatchWrite inside SparkPositionDeltaWrite.

SparkShufflingDataRewriter and OrderedWrite and not BatchWrite, so I think they are fine.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me.

I think this change is safe cause we don't strictly require a single task attempt to commit. We only require that Spark picks one commit message if multiple task attempts succeed. That's true even if we disable the commit coordinator.

I will leave this PR open for a bit to see if anyone has any concerns.

cc @singhpk234 @RussellSpitzer @rdblue

@aokolnychyi aokolnychyi merged commit 6a9c182 into apache:main Nov 10, 2023
35 checks passed
@aokolnychyi
Copy link
Contributor

Thanks, @huaxingao! Could you do a similar fix for BaseStreamingWrite? I

@huaxingao
Copy link
Contributor Author

Thanks @aokolnychyi

I will have a follow up to fix BaseStreamingWrite.

@huaxingao huaxingao deleted the commit_coordinator branch November 10, 2023 19:42
aokolnychyi pushed a commit that referenced this pull request Nov 11, 2023
This change cherrypicks PR #9017 to Spark 3.4.

Co-authored-by: Huaxin Gao <huaxin.gao@apple.com>
@daoxunwu-vungle
Copy link

Hello, will this fix be propogated to 1.3.x and 1.4.x releases?

@jiantao-vungle
Copy link

jiantao-vungle commented Nov 20, 2023

hi @huaxingao, we tried to execute sql INSERT into a Iceberg table, but encountered following exception stack, it's like #8904, but without Iceberg calling in the stack, my questions:

Under: Iceberg 1.3.1 + Spark 3.4.1

Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:63)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 186 in stage 9.0 failed 4 times, most recent failure: Lost task 186.4 in stage 9.0 (TID 2992) (172.26.2.81 executor 20): org.apache.spark.SparkException: Commit denied for partition 186 (task 2992, attempt 4, stage 9.0).
    at org.apache.spark.sql.errors.QueryExecutionErrors$.commitDeniedError(QueryExecutionErrors.scala:929)
    at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:485)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
    at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509)
    at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:408)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:382)
    at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:248)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:360)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:359)
    at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:248)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
    at com.xxxxxx.xxxx.BoilerplateSparkMain.withSaveIcebergStagingTable(Boilerplate.scala:1685)
    at com.xxxxxx.xxxx.BoilerplateSparkMain.withSaveIcebergStagingTable$(Boilerplate.scala:1667)
    at com.xxxxxx.xxxx.hbp.notifications_attribution.SparkMain$.withSaveIcebergStagingTable(SparkMain.scala:13)
    at com.xxxxxx.xxxx.hbp.notifications_attribution.SparkMain$.process(SparkMain.scala:156)
    at com.xxxxxx.xxxx.hbp.notifications_attribution.SparkMain$.$anonfun$run$3(SparkMain.scala:221)
    at com.xxxxxx.xxxx.hbp.notifications_attribution.SparkMain$.$anonfun$run$3$adapted(SparkMain.scala:218)
    at com.xxxxxx.xxxx.BoilerplateSparkMain.withCoba2TempViewInRange(Boilerplate.scala:1354)
    at com.xxxxxx.xxxx.BoilerplateSparkMain.withCoba2TempViewInRange$(Boilerplate.scala:1343)
    at com.xxxxxx.xxxx.hbp.notifications_attribution.SparkMain$.withCoba2TempViewInRange(SparkMain.scala:13)
    at com.xxxxxx.xxxx.hbp.notifications_attribution.SparkMain$.$anonfun$run$2(SparkMain.scala:218)
    at com.xxxxxx.xxxx.hbp.notifications_attribution.SparkMain$.$anonfun$run$2$adapted(SparkMain.scala:202)
    at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
    at com.xxxxxx.xxxx.hbp.notifications_attribution.SparkMain$.run(SparkMain.scala:202)
    at com.xxxxxx.xxxx.BoilerplateSparkMain.main(Boilerplate.scala:2625)
    at com.xxxxxx.xxxx.BoilerplateSparkMain.main$(Boilerplate.scala:2605)
    at com.xxxxxx.xxxx.hbp.notifications_attribution.SparkMain$.main(SparkMain.scala:13)
    at com.xxxxxx.xxxx.hbp.notifications_attribution.SparkMain.main(SparkMain.scala)
    ... 6 more
Caused by: org.apache.spark.SparkException: Commit denied for partition 186 (task 2992, attempt 4, stage 9.0).
    at org.apache.spark.sql.errors.QueryExecutionErrors$.commitDeniedError(QueryExecutionErrors.scala:929)
    at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:485)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
    at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509)
    at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

@huaxingao
Copy link
Contributor Author

@jiantao-vungle The problem you encountered is probably the same as the one in this PR. Are you experiencing this issue only in Spark 3.4?

@huaxingao
Copy link
Contributor Author

@aokolnychyi Do we need to back-port the fix to previous releases?

@Fokko
Copy link
Contributor

Fokko commented Nov 20, 2023

Looping in @jbonofre here who's preparing the 1.4.3 release.

@jiantao-vungle
Copy link

@jiantao-vungle The problem you encountered is probably the same as the one in this PR. Are you experiencing this issue only in Spark 3.4?

@huaxingao, actually, it was encountered after we upgraded both Spark(from 3.3.1 to 3.4.1) and Iceberg(from 1.2.0 to 1.3.1)

@jbonofre
Copy link
Member

Thanks for the report. Let me take a look.

jiantao-vungle pushed a commit to jiantao-vungle/iceberg that referenced this pull request Nov 24, 2023
…9028)

This change cherrypicks PR apache#9017 to Spark 3.4.

Co-authored-by: Huaxin Gao <huaxin.gao@apple.com>
@mukeshkumarkulmi
Copy link

@aokolnychyi Do we need to back-port the fix to previous releases?

Hi @aokolnychyi We are also facing same issue on Iceberg 1.3.0 while deleting records from the table using merge command. We recently upgraded to Spark 3.4.0 and started facing this issue. Can we back-port this fix to Iceberg 1.3.0?

@huaxingao
Copy link
Contributor Author

@mukeshkumarkulmi I will open a PR to backport the fix to 1.3.0

@huaxingao
Copy link
Contributor Author

@mukeshkumarkulmi actually I am not sure if I can back-port the fix to 1.3.0. Can you cherry-pick the fix to your internal 1.3.0?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants