From 4de52a3919b3d703d336adaa8666f2b27bc69ea8 Mon Sep 17 00:00:00 2001 From: mingji Date: Thu, 26 Dec 2024 17:58:41 +0800 Subject: [PATCH] [CELEBORN-1701][FOLLOWUP] Support stage rerun for shuffle data lost ### What changes were proposed in this pull request? Fix an error that may cause the application master retry stage rerun infinitely. ### Why are the changes needed? Correct the parameters passed. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? GA. Closes #3033 from FMX/b1071-1. Authored-by: mingji Signed-off-by: SteNicholas (cherry picked from commit 52fa151aa4c618ee82370ba3959298b4056ee352) Signed-off-by: SteNicholas --- .../celeborn/CelebornShuffleReader.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala index 745cc0770b0..f4edc0f2ff3 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala @@ -111,7 +111,7 @@ class CelebornShuffleReader[K, C]( fileGroups = shuffleClient.updateFileGroup(shuffleId, startPartition) } catch { case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) => - handleFetchExceptions(shuffleId, 0, ce) + handleFetchExceptions(handle.shuffleId, shuffleId, 0, ce) case e: Throwable => throw e } @@ -254,7 +254,7 @@ class CelebornShuffleReader[K, C]( if (exceptionRef.get() != null) { exceptionRef.get() match { case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) => - handleFetchExceptions(handle.shuffleId, partitionId, ce) + handleFetchExceptions(handle.shuffleId, shuffleId, partitionId, ce) case e => throw e } } @@ -289,7 +289,7 @@ class CelebornShuffleReader[K, C]( iter } catch { case e @ (_: CelebornIOException | _: PartitionUnRetryAbleException) => - handleFetchExceptions(handle.shuffleId, partitionId, e) + handleFetchExceptions(handle.shuffleId, shuffleId, partitionId, e) } } @@ -369,17 +369,21 @@ class CelebornShuffleReader[K, C]( } } - private def handleFetchExceptions(shuffleId: Int, partitionId: Int, ce: Throwable) = { + private def handleFetchExceptions( + appShuffleId: Int, + shuffleId: Int, + partitionId: Int, + ce: Throwable) = { if (throwsFetchFailure && - shuffleClient.reportShuffleFetchFailure(handle.shuffleId, shuffleId)) { + shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId)) { logWarning(s"Handle fetch exceptions for ${shuffleId}-${partitionId}", ce) throw new FetchFailedException( null, - handle.shuffleId, + appShuffleId, -1, -1, partitionId, - SparkUtils.FETCH_FAILURE_ERROR_MSG + handle.shuffleId + "/" + shuffleId, + SparkUtils.FETCH_FAILURE_ERROR_MSG + appShuffleId + "/" + shuffleId, ce) } else throw ce