Skip to content

Commit

Permalink
[SPARK-43242][CORE] Fix throw 'Unexpected type of BlockId' in shuffle…
Browse files Browse the repository at this point in the history
… corruption diagnose

### What changes were proposed in this pull request?
A minor bugfix in `ShuffleBlockFetcherIterator.diagnose`, which not handle type ShuffleBlockBatchId properly

### Why are the changes needed?
`.diagnose()` is used in exception handling try-catch block, throw new exception due to type mismatch (in this case, `ShuffleBlockBatchId`) will swallow original exception stack

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests

Closes apache#40921 from CavemanIV/SPARK-43242.

Lead-authored-by: zhangliang <zhangliang@trip.com>
Co-authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Yi Wu <yi.wu@databricks.com>
  • Loading branch information
2 people authored and Ngone51 committed Aug 27, 2024
1 parent c106c77 commit 1bdbe14
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,12 @@ final class ShuffleBlockFetcherIterator(
log"checksum support for push-based shuffle.")
s"BlockChunk $shuffleBlockChunk is corrupted but corruption " +
s"diagnosis is skipped due to lack of shuffle checksum support for push-based shuffle."
case shuffleBlockBatch: ShuffleBlockBatchId =>
logWarning(log"BlockBatch ${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockBatch)} is corrupted " +
log"but corruption diagnosis is skipped due to lack of shuffle checksum support for " +
log"ShuffleBlockBatchId")
s"BlockBatch $shuffleBlockBatch is corrupted but corruption " +
s"diagnosis is skipped due to lack of shuffle checksum support for ShuffleBlockBatchId"
case unexpected: BlockId =>
throw SparkException.internalError(
s"Unexpected type of BlockId, $unexpected", category = "STORAGE")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1943,4 +1943,31 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT

assert(err2.getMessage.contains("corrupt at reset"))
}

test("SPARK-43242: Fix throw 'Unexpected type of BlockId' in shuffle corruption diagnose") {
val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
val blocks = Map[BlockId, ManagedBuffer](
ShuffleBlockBatchId(0, 0, 0, 3) -> createMockManagedBuffer())
answerFetchBlocks { invocation =>
val listener = invocation.getArgument[BlockFetchingListener](4)
listener.onBlockFetchSuccess(ShuffleBlockBatchId(0, 0, 0, 3).toString, mockCorruptBuffer())
}

val logAppender = new LogAppender("diagnose corruption")
withLogAppender(logAppender) {
val iterator = createShuffleBlockIteratorWithDefaults(
Map(remoteBmId -> toBlockList(blocks.keys, 1L, 0)),
streamWrapperLimitSize = Some(100)
)
intercept[FetchFailedException](iterator.next())
verify(transfer, times(2))
.fetchBlocks(any(), any(), any(), any(), any(), any())
assert(logAppender.loggingEvents.count(
_.getMessage.getFormattedMessage.contains("Start corruption diagnosis")) === 1)
assert(logAppender.loggingEvents.exists(
_.getMessage.getFormattedMessage.contains("shuffle_0_0_0_3 is corrupted " +
"but corruption diagnosis is skipped due to lack of " +
"shuffle checksum support for ShuffleBlockBatchId")))
}
}
}

0 comments on commit 1bdbe14

Please sign in to comment.