Skip to content

Commit

Permalink
[CELEBORN-1701][FOLLOWUP] Support stage rerun for shuffle data lost
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Fix an error that may cause the application master retry stage rerun infinitely.

### Why are the changes needed?
Correct the parameters passed.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
GA.

Closes #3033 from FMX/b1071-1.

Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: SteNicholas <programgeek@163.com>
(cherry picked from commit 52fa151)
Signed-off-by: SteNicholas <programgeek@163.com>
  • Loading branch information
FMX authored and SteNicholas committed Dec 26, 2024
1 parent 2c7847e commit 4de52a3
Showing 1 changed file with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class CelebornShuffleReader[K, C](
fileGroups = shuffleClient.updateFileGroup(shuffleId, startPartition)
} catch {
case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
handleFetchExceptions(shuffleId, 0, ce)
handleFetchExceptions(handle.shuffleId, shuffleId, 0, ce)
case e: Throwable => throw e
}

Expand Down Expand Up @@ -254,7 +254,7 @@ class CelebornShuffleReader[K, C](
if (exceptionRef.get() != null) {
exceptionRef.get() match {
case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
handleFetchExceptions(handle.shuffleId, partitionId, ce)
handleFetchExceptions(handle.shuffleId, shuffleId, partitionId, ce)
case e => throw e
}
}
Expand Down Expand Up @@ -289,7 +289,7 @@ class CelebornShuffleReader[K, C](
iter
} catch {
case e @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
handleFetchExceptions(handle.shuffleId, partitionId, e)
handleFetchExceptions(handle.shuffleId, shuffleId, partitionId, e)
}
}

Expand Down Expand Up @@ -369,17 +369,21 @@ class CelebornShuffleReader[K, C](
}
}

private def handleFetchExceptions(shuffleId: Int, partitionId: Int, ce: Throwable) = {
private def handleFetchExceptions(
appShuffleId: Int,
shuffleId: Int,
partitionId: Int,
ce: Throwable) = {
if (throwsFetchFailure &&
shuffleClient.reportShuffleFetchFailure(handle.shuffleId, shuffleId)) {
shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId)) {
logWarning(s"Handle fetch exceptions for ${shuffleId}-${partitionId}", ce)
throw new FetchFailedException(
null,
handle.shuffleId,
appShuffleId,
-1,
-1,
partitionId,
SparkUtils.FETCH_FAILURE_ERROR_MSG + handle.shuffleId + "/" + shuffleId,
SparkUtils.FETCH_FAILURE_ERROR_MSG + appShuffleId + "/" + shuffleId,
ce)
} else
throw ce
Expand Down

0 comments on commit 4de52a3

Please sign in to comment.