Skip to content

Commit

Permalink
[SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails…
Browse files Browse the repository at this point in the history
… to initiate retry

### What changes were proposed in this pull request?
This PR fixes a bug in `RetryingBlockTransferor` that happens when retry initiation has failed.

With this patch, the callers of `RetryingBlockTransfeathror#initiateRetry()` will catch any error and invoke the parent listener's exception handler.

### Why are the changes needed?
This is needed to prevent an edge case where retry initiation fails and executor gets stuck.

More details in SPARK-44756

### Does this PR introduce _any_ user-facing change?
N/A

### How was this patch tested?
Added a new test case in `RetryingBlockTransferorSuite` that simulates the problematic scenario.

<img width="772" alt="image" src="https://github.com/apache/spark/assets/17327104/f20ec327-f5c9-4d74-b861-1ea4e05eb46b">

Closes #42426 from hdaikoku/SPARK-44756.

Authored-by: Harunobu Daikoku <harunobu.daikoku@agoda.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
hdaikoku authored and Mridul Muralidharan committed Sep 26, 2023
1 parent 60d02b4 commit ff084d2
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ public RetryingBlockTransferor(
this(conf, transferStarter, blockIds, listener, ErrorHandler.NOOP_ERROR_HANDLER);
}

@VisibleForTesting
synchronized void setCurrentListener(RetryingBlockTransferListener listener) {
this.currentListener = listener;
}

/**
* Initiates the transfer of all blocks provided in the constructor, with possible retries
* in the event of transient IOExceptions.
Expand Down Expand Up @@ -176,21 +181,25 @@ private void transferAllOutstanding() {
listener.getTransferType(), blockIdsToTransfer.length,
numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);

if (shouldRetry(e)) {
initiateRetry(e);
} else {
for (String bid : blockIdsToTransfer) {
listener.onBlockTransferFailure(bid, e);
}
if (shouldRetry(e) && initiateRetry(e)) {
// successfully initiated a retry
return;
}

// retry is not possible, so fail remaining blocks
for (String bid : blockIdsToTransfer) {
listener.onBlockTransferFailure(bid, e);
}
}
}

/**
* Lightweight method which initiates a retry in a different thread. The retry will involve
* calling transferAllOutstanding() after a configured wait time.
* Returns true if the retry was successfully initiated, false otherwise.
*/
private synchronized void initiateRetry(Throwable e) {
@VisibleForTesting
synchronized boolean initiateRetry(Throwable e) {
if (enableSaslRetries && e instanceof SaslTimeoutException) {
saslRetryCount += 1;
}
Expand All @@ -201,10 +210,17 @@ private synchronized void initiateRetry(Throwable e) {
listener.getTransferType(), retryCount, maxRetries, outstandingBlocksIds.size(),
retryWaitTime);

executorService.submit(() -> {
Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
transferAllOutstanding();
});
try {
executorService.execute(() -> {
Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
transferAllOutstanding();
});
} catch (Throwable t) {
logger.error("Exception while trying to initiate retry", t);
return false;
}

return true;
}

/**
Expand Down Expand Up @@ -240,7 +256,8 @@ public int getRetryCount() {
* listener. Note that in the event of a retry, we will immediately replace the 'currentListener'
* field, indicating that any responses from non-current Listeners should be ignored.
*/
private class RetryingBlockTransferListener implements
@VisibleForTesting
class RetryingBlockTransferListener implements
BlockFetchingListener, BlockPushingListener {
private void handleBlockTransferSuccess(String blockId, ManagedBuffer data) {
// We will only forward this success message to our parent listener if this block request is
Expand Down Expand Up @@ -274,7 +291,11 @@ private void handleBlockTransferFailure(String blockId, Throwable exception) {
synchronized (RetryingBlockTransferor.this) {
if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
if (shouldRetry(exception)) {
initiateRetry(exception);
if (!initiateRetry(exception)) {
// failed to initiate a retry, so fail this block
outstandingBlocksIds.remove(blockId);
shouldForwardFailure = true;
}
} else {
if (errorHandler.shouldLogError(exception)) {
logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,32 @@ public void testIOExceptionFailsConnectionEvenWithSaslException()
assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
}

@Test
public void testRetryInitiationFailure() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
// IOException will initiate a retry, but the initiation will fail
ImmutableMap.<String, Object>builder()
.put("b0", new IOException("Connection failed or something"))
.put("b1", block1)
.build()
);

configureInteractions(interactions, listener);
_retryingBlockTransferor = spy(_retryingBlockTransferor);
// Simulate a failure to initiate a retry.
doReturn(false).when(_retryingBlockTransferor).initiateRetry(any());
// Override listener, so that it delegates to the spied instance and not the original class.
_retryingBlockTransferor.setCurrentListener(
_retryingBlockTransferor.new RetryingBlockTransferListener());
_retryingBlockTransferor.start();

verify(listener, timeout(5000)).onBlockTransferFailure(eq("b0"), any());
verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1);
verifyNoMoreInteractions(listener);
}

/**
* Performs a set of interactions in response to block requests from a RetryingBlockFetcher.
* Each interaction is a Map from BlockId to either ManagedBuffer or Exception. This interaction
Expand All @@ -384,6 +410,13 @@ public void testIOExceptionFailsConnectionEvenWithSaslException()
*/
private static void performInteractions(List<? extends Map<String, Object>> interactions,
BlockFetchingListener listener)
throws IOException, InterruptedException {
configureInteractions(interactions, listener);
_retryingBlockTransferor.start();
}

private static void configureInteractions(List<? extends Map<String, Object>> interactions,
BlockFetchingListener listener)
throws IOException, InterruptedException {

MapConfigProvider provider = new MapConfigProvider(configMap);
Expand Down Expand Up @@ -440,6 +473,5 @@ private static void performInteractions(List<? extends Map<String, Object>> inte
String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
_retryingBlockTransferor =
new RetryingBlockTransferor(conf, fetchStarter, blockIdArray, listener);
_retryingBlockTransferor.start();
}
}

0 comments on commit ff084d2

Please sign in to comment.