diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4aea442bc3ce1..115f0663ef2b7 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -281,12 +281,7 @@ class SparkContext(config: SparkConf) extends Logging { conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { - SparkEnv.createDriverEnv( - conf, - isLocal, - listenerBus, - SparkContext.numDriverCores(master, conf), - this) + SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf)) } private[spark] def env: SparkEnv = _env diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 272a0a6332bbe..edad91a0c6f0d 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -169,7 +169,6 @@ object SparkEnv extends Logging { isLocal: Boolean, listenerBus: LiveListenerBus, numCores: Int, - sparkContext: SparkContext, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { assert(conf.contains(DRIVER_HOST_ADDRESS), s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!") @@ -192,7 +191,6 @@ object SparkEnv extends Logging { numCores, ioEncryptionKey, listenerBus = listenerBus, - Option(sparkContext), mockOutputCommitCoordinator = mockOutputCommitCoordinator ) } @@ -237,7 +235,6 @@ object SparkEnv extends Logging { /** * Helper method to create a SparkEnv for a driver or an executor. */ - // scalastyle:off argcount private def create( conf: SparkConf, executorId: String, @@ -248,9 +245,7 @@ object SparkEnv extends Logging { numUsableCores: Int, ioEncryptionKey: Option[Array[Byte]], listenerBus: LiveListenerBus = null, - sc: Option[SparkContext] = None, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { - // scalastyle:on argcount val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER @@ -396,12 +391,7 @@ object SparkEnv extends Logging { } val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse { - if (isDriver) { - new OutputCommitCoordinator(conf, isDriver, sc) - } else { - new OutputCommitCoordinator(conf, isDriver) - } - + new OutputCommitCoordinator(conf, isDriver) } val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator", new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index cd5d6b8f9c90d..a5858ebf9cdcc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -44,10 +44,7 @@ private case class AskPermissionToCommitOutput( * This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests) * for an extensive design discussion. */ -private[spark] class OutputCommitCoordinator( - conf: SparkConf, - isDriver: Boolean, - sc: Option[SparkContext] = None) extends Logging { +private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging { // Initialized by SparkEnv var coordinatorRef: Option[RpcEndpointRef] = None @@ -158,10 +155,9 @@ private[spark] class OutputCommitCoordinator( val taskId = TaskIdentifier(stageAttempt, attemptNumber) stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId if (stageState.authorizedCommitters(partition) == taskId) { - sc.foreach(_.dagScheduler.stageFailed(stage, s"Authorized committer " + - s"(attemptNumber=$attemptNumber, stage=$stage, partition=$partition) failed; " + - s"but task commit success, data duplication may happen. " + - s"reason=$reason")) + logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " + + s"partition=$partition) failed; clearing lock") + stageState.authorizedCommitters(partition) = null } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala index 45da750768fa9..7d063c3b3ac53 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala @@ -19,8 +19,9 @@ package org.apache.spark.scheduler import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext} import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} +import org.scalatest.time.{Seconds, Span} -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite, TaskContext} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TaskContext} /** * Integration tests for the OutputCommitCoordinator. @@ -44,15 +45,13 @@ class OutputCommitCoordinatorIntegrationSuite sc = new SparkContext("local[2, 4]", "test", conf) } - test("SPARK-39195: exception thrown in OutputCommitter.commitTask()") { + test("exception thrown in OutputCommitter.commitTask()") { // Regression test for SPARK-10381 - val e = intercept[SparkException] { + failAfter(Span(60, Seconds)) { withTempDir { tempDir => sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out") } - }.getCause.getMessage - assert(e.contains("failed; but task commit success, data duplication may happen.") && - e.contains("Intentional exception")) + } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 44dc9a5f97dab..d84892be14af5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -87,12 +87,11 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { outputCommitCoordinator = - spy[OutputCommitCoordinator]( - new OutputCommitCoordinator(conf, isDriver = true, Option(this))) + spy[OutputCommitCoordinator](new OutputCommitCoordinator(conf, isDriver = true)) // Use Mockito.spy() to maintain the default infrastructure everywhere else. // This mocking allows us to control the coordinator responses in test cases. SparkEnv.createDriverEnv(conf, isLocal, listenerBus, - SparkContext.numDriverCores(master), this, Some(outputCommitCoordinator)) + SparkContext.numDriverCores(master), Some(outputCommitCoordinator)) } } // Use Mockito.spy() to maintain the default infrastructure everywhere else @@ -190,9 +189,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { // The authorized committer now fails, clearing the lock outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition, attemptNumber = authorizedCommitter, reason = TaskKilled("test")) - // A new task should not be allowed to become stage failed because of potential data duplication - assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition, + // A new task should now be allowed to become the authorized committer + assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, nonAuthorizedCommitter + 2)) + // There can only be one authorized committer + assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition, + nonAuthorizedCommitter + 3)) } test("SPARK-19631: Do not allow failed attempts to be authorized for committing") { @@ -226,8 +228,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert(outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt)) // Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit, - // then fail the 1st attempt and since stage failed because of potential data duplication, - // make sure fail the 4th attempt. + // then fail the 1st attempt and make sure the 4th one can commit again. stage += 1 outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt)) @@ -236,9 +237,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt)) outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt, ExecutorLostFailure("0", exitCausedByApp = true, None)) - // A new task should not be allowed to become the authorized committer since stage failed - // because of potential data duplication - assert(!outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt)) + assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt)) } test("SPARK-24589: Make sure stage state is cleaned up") {