Skip to content

Commit

Permalink
[SPARK-26069][TESTS] Fix flaky test: RpcIntegrationSuite.sendRpcWithS…
Browse files Browse the repository at this point in the history
…treamFailures

## What changes were proposed in this pull request?

The test failure is because `assertErrorAndClosed` misses one possible error message: `java.nio.channels.ClosedChannelException`. This happens when the second `uploadStream` is called after the channel has been closed. This can be reproduced by adding `Thread.sleep(1000)` below this line: https://github.com/apache/spark/blob/03306a6df39c9fd6cb581401c13c4dfc6bbd632e/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java#L217

This PR fixes the above issue and also improves the test failure messages of `assertErrorAndClosed`.

## How was this patch tested?

Jenkins

Closes #23041 from zsxwing/SPARK-26069.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
  • Loading branch information
zsxwing committed Nov 16, 2018
1 parent a2fc48c commit 99cbc51
Showing 1 changed file with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,23 +371,29 @@ private void assertErrorsContain(Set<String> errors, Set<String> contains) {

private void assertErrorAndClosed(RpcResult result, String expectedError) {
assertTrue("unexpected success: " + result.successMessages, result.successMessages.isEmpty());
// we expect 1 additional error, which contains *either* "closed" or "Connection reset"
// we expect 1 additional error, which should contain one of the follow messages:
// - "closed"
// - "Connection reset"
// - "java.nio.channels.ClosedChannelException"
Set<String> errors = result.errorMessages;
assertEquals("Expected 2 errors, got " + errors.size() + "errors: " +
errors, 2, errors.size());

Set<String> containsAndClosed = Sets.newHashSet(expectedError);
containsAndClosed.add("closed");
containsAndClosed.add("Connection reset");
containsAndClosed.add("java.nio.channels.ClosedChannelException");

Pair<Set<String>, Set<String>> r = checkErrorsContain(errors, containsAndClosed);

Set<String> errorsNotFound = r.getRight();
assertEquals(1, errorsNotFound.size());
String err = errorsNotFound.iterator().next();
assertTrue(err.equals("closed") || err.equals("Connection reset"));
assertTrue("Got a non-empty set " + r.getLeft(), r.getLeft().isEmpty());

assertTrue(r.getLeft().isEmpty());
Set<String> errorsNotFound = r.getRight();
assertEquals(
"The size of " + errorsNotFound.toString() + " was not 2", 2, errorsNotFound.size());
for (String err: errorsNotFound) {
assertTrue("Found a wrong error " + err, containsAndClosed.contains(err));
}
}

private Pair<Set<String>, Set<String>> checkErrorsContain(
Expand Down

0 comments on commit 99cbc51

Please sign in to comment.