Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status #36564

Closed
wants to merge 53 commits into from
Closed
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
19135e0
[SPARK-39195][SQL] Spark should use two step update of outputCommitCo…
AngersZhuuuu May 16, 2022
7062d32
Update
AngersZhuuuu May 17, 2022
f4caa22
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu May 18, 2022
5e6c0be
Spark OutputCommitCoordinator should keep consistent
AngersZhuuuu May 24, 2022
171bd6a
Merge branch 'SPARK-39195' of https://github.com/AngersZhuuuu/spark i…
AngersZhuuuu May 24, 2022
b070c9d
revert
AngersZhuuuu May 24, 2022
796c08c
Update OutputCommitCoordinator.scala
AngersZhuuuu May 24, 2022
1d79aae
Merge remote-tracking branch 'upstream/master' into SPARK-39195
AngersZhuuuu May 25, 2022
5f5729b
Update SparkHadoopMapRedUtil.scala
AngersZhuuuu May 25, 2022
759814b
Update
AngersZhuuuu May 25, 2022
b4e11cc
trigger
AngersZhuuuu May 26, 2022
7642cdb
Update SparkHadoopMapRedUtil.scala
AngersZhuuuu May 27, 2022
e6dce26
Revert "Update SparkHadoopMapRedUtil.scala"
AngersZhuuuu May 27, 2022
6a403d7
Revert "Update"
AngersZhuuuu May 27, 2022
bc1214c
Revert "Update SparkHadoopMapRedUtil.scala"
AngersZhuuuu May 27, 2022
b5d2885
Revert "Update OutputCommitCoordinator.scala"
AngersZhuuuu May 27, 2022
b13dfbe
Revert "Spark OutputCommitCoordinator should keep consistent"
AngersZhuuuu May 27, 2022
ad67d0d
[SPARK-39195][SQL] Spark should use two step update of outputCommitCo…
AngersZhuuuu May 16, 2022
0e366a8
Update
AngersZhuuuu May 17, 2022
11ba4b7
Update
AngersZhuuuu May 27, 2022
cc71ddc
Update OutputCommitCoordinator.scala
AngersZhuuuu May 27, 2022
e7204df
Update OutputCommitCoordinator.scala
AngersZhuuuu May 27, 2022
9426f30
Update
AngersZhuuuu May 27, 2022
60e03f3
Update OutputCommitCoordinator.scala
AngersZhuuuu May 27, 2022
f77c9c3
follow comment
AngersZhuuuu May 30, 2022
c1faddd
Update SQLQuerySuite.scala
AngersZhuuuu Jun 8, 2022
f7b92e1
Update SQLQuerySuite.scala
AngersZhuuuu Jun 8, 2022
ec5ef4b
re-trigger
AngersZhuuuu Jun 8, 2022
58ea1a9
Update
AngersZhuuuu Jun 8, 2022
58b6c0f
Update OutputCommitCoordinator.scala
AngersZhuuuu Jun 8, 2022
beed831
Update
AngersZhuuuu Jun 8, 2022
87fadf0
Update SQLQuerySuite.scala
AngersZhuuuu Jun 8, 2022
46aa5e0
Update DAGScheduler.scala
AngersZhuuuu Jun 8, 2022
3924127
Update core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoo…
AngersZhuuuu Jun 8, 2022
9c24d8e
Follow comment
AngersZhuuuu Jun 9, 2022
60beea1
Update SQLQuerySuite.scala
AngersZhuuuu Jun 9, 2022
c0bbcb8
Follow comment
AngersZhuuuu Jun 9, 2022
35b613a
Update SparkEnv.scala
AngersZhuuuu Jun 9, 2022
7bc078f
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 9, 2022
b52a617
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 9, 2022
ce5ac35
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 9, 2022
7ca7ca3
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 9, 2022
79815ab
Update OutputCommitCoordinatorIntegrationSuite.scala
AngersZhuuuu Jun 9, 2022
c6c73e1
trigegr
AngersZhuuuu Jun 10, 2022
77f124c
update
AngersZhuuuu Jun 11, 2022
56543db
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 11, 2022
b524a87
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 14, 2022
4a1a092
Update OutputCommitCoordinatorIntegrationSuite.scala
AngersZhuuuu Jun 14, 2022
cd404d3
Merge branch 'master' into SPARK-39195
AngersZhuuuu Jun 14, 2022
a6be796
Update OutputCommitCoordinatorIntegrationSuite.scala
AngersZhuuuu Jun 15, 2022
d23bcbc
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 15, 2022
c29c9fb
Revert "Update OutputCommitCoordinatorSuite.scala"
AngersZhuuuu Jun 15, 2022
11d3ef2
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ object SparkHadoopMapRedUtil extends Logging {

if (canCommit) {
performCommit()
outputCommitCoordinator.commitSuccess(ctx.stageId(),
AngersZhuuuu marked this conversation as resolved.
Show resolved Hide resolved
ctx.stageAttemptNumber(), splitId, ctx.attemptNumber())
} else {
val message =
s"$mrTaskAttemptID: Not committed because the driver did not authorize commit"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ private case class AskPermissionToCommitOutput(
partition: Int,
attemptNumber: Int)

private case class CommitOutputSuccess(
stage: Int,
stateAttempt: Int,
partition: Int,
attemptNumber: Int)

/**
* Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins"
* policy.
Expand All @@ -55,8 +61,13 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
// concurrently in two different attempts of the same stage.
private case class TaskIdentifier(stageAttempt: Int, taskAttempt: Int)

// Class used to identify a committer 's status when this committer is allowed to commit task.
// Status is false means this committer is allowed to commit task, status is true means this
// committer have sent CommitOutputSuccess message to OutputCommitCoordinator.
private case class CommitStatus(taskIdent: TaskIdentifier, status: Boolean)

private case class StageState(numPartitions: Int) {
val authorizedCommitters = Array.fill[TaskIdentifier](numPartitions)(null)
val authorizedCommitters = Array.fill[CommitStatus](numPartitions)(null)
val failures = mutable.Map[Int, mutable.Set[TaskIdentifier]]()
}

Expand Down Expand Up @@ -100,6 +111,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
val msg = AskPermissionToCommitOutput(stage, stageAttempt, partition, attemptNumber)
coordinatorRef match {
case Some(endpointRef) =>
endpointRef.send()
ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg),
RpcUtils.askRpcTimeout(conf).duration)
case None =>
Expand All @@ -109,6 +121,33 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
}
}

/**
* Called by tasks to update commit output success status.
*
* If a task attempt has been authorized to commit, after commit task success,
* task side should update commit status to true.
*
* @param stage the stage number
* @param partition the partition number
* @param attemptNumber how many times this task has been attempted
* (see [[TaskContext.attemptNumber()]])
* @return true if this task can update the commit output success status.
*/
def commitSuccess(
stage: Int,
stageAttempt: Int,
partition: Int,
attemptNumber: Int): Unit = {
val msg = CommitOutputSuccess(stage, stageAttempt, partition, attemptNumber)
coordinatorRef match {
case Some(endpointRef) =>
endpointRef.send(msg)
case None =>
logError(
"commitSuccess called after coordinator was stopped (is SparkEnv shutdown in progress)?")
}
}

/**
* Called by the DAGScheduler when a stage starts. Initializes the stage's state if it hasn't
* yet been initialized.
Expand Down Expand Up @@ -154,10 +193,17 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
// Mark the attempt as failed to exclude from future commit protocol
val taskId = TaskIdentifier(stageAttempt, attemptNumber)
stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId
if (stageState.authorizedCommitters(partition) == taskId) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
stageState.authorizedCommitters(partition) = null
val commitStatus = stageState.authorizedCommitters(partition)
if (commitStatus != null && commitStatus.taskIdent == taskId) {
if (commitStatus.status) {
throw new SparkException(s"Authorized committer (attemptNumber=$attemptNumber, " +
s"stage=$stage, partition=$partition) failed; but task commit success, " +
s"should fail the job")
} else {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
stageState.authorizedCommitters(partition) = null
}
}
}
}
Expand Down Expand Up @@ -186,11 +232,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
if (existing == null) {
logDebug(s"Commit allowed for stage=$stage.$stageAttempt, partition=$partition, " +
s"task attempt $attemptNumber")
state.authorizedCommitters(partition) = TaskIdentifier(stageAttempt, attemptNumber)
state.authorizedCommitters(partition) =
CommitStatus(TaskIdentifier(stageAttempt, attemptNumber), false)
true
} else {
logDebug(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " +
s"already committed by $existing")
s"already committed by ${existing.taskIdent}")
false
}
case None =>
Expand All @@ -200,6 +247,42 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
}
}

private[scheduler] def handleCommitOutputSuccess(
stage: Int,
stageAttempt: Int,
partition: Int,
attemptNumber: Int): Unit = synchronized {
stageStates.get(stage) match {
case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) =>
throw new SparkException(s"Authorized committer (attemptNumber=$attemptNumber, " +
s"stage=$stage, partition=$partition) failed; but task commit success, " +
s"should fail the job")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems throw exception here won't stop the job, any suggestion.

case Some(state) =>
val existing = state.authorizedCommitters(partition)
if (existing == null) {
throw new SparkException(s"Authorized committer (attemptNumber=$attemptNumber, " +
s"stage=$stage, partition=$partition) commit success; partition lock removed " +
s"before receive CommitOutputSuccess, should fail the job")
} else {
val taskIdent = existing.taskIdent
if (taskIdent.stageAttempt == stageAttempt && taskIdent.taskAttempt == attemptNumber) {
logDebug(s"Task Commit success for stage=$stage.$stageAttempt, " +
s"partition=$partition, task attempt $attemptNumber")
state.authorizedCommitters(partition) =
CommitStatus(TaskIdentifier(stageAttempt, attemptNumber), true)
true
} else {
throw new SparkException(s"Authorized committer (attemptNumber=$attemptNumber, " +
s"stage=$stage, partition=$partition) commit success; but partition lock not " +
s"consistent, should fail the job")
}
}
case None =>
logDebug(s"Commit update status failed for stage=$stage.$stageAttempt, " +
s"partition=$partition: stage already marked as completed.")
}
}

private def attemptFailed(
stageState: StageState,
stageAttempt: Int,
Expand All @@ -223,6 +306,10 @@ private[spark] object OutputCommitCoordinator {
case StopCoordinator =>
logInfo("OutputCommitCoordinator stopped!")
stop()

case CommitOutputSuccess(stage, stageAttempt, partition, attemptNumber) =>
outputCommitCoordinator.handleCommitOutputSuccess(stage, stageAttempt, partition,
attemptNumber)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,24 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
.stageStart(meq(retriedStage.head), any())
verify(sc.env.outputCommitCoordinator).stageEnd(meq(retriedStage.head))
}

test("SPARK-39195: Spark should use two step update of outputCommitCoordinator") {
var stage = 1
val taskAttempt = 1
val partition = 1

// Test receive CommitOutputSuccess but task failed.
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
assert(!outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt + 1))
outputCommitCoordinator.commitSuccess(stage, 1, partition, taskAttempt)
val e1 = intercept[SparkException] {
outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
ExecutorLostFailure("0", exitCausedByApp = true, None))
}.getMessage
assert(e1.contains("Authorized committer (attemptNumber=1, stage=1, partition=1) failed; " +
"but task commit success, should fail the job"))
}
}

/**
Expand Down