Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] OpenSearch crashes on closed client connection before search reply when total ops higher compared to expected #4144

Merged
merged 1 commit into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,16 @@ public void onFailure(Exception t) {
* It is possible to run into connection exceptions here because we are getting the connection early and might
* run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
*/
fork(() -> onShardFailure(shardIndex, shard, shardIt, e));
fork(() -> {
// It only happens when onPhaseDone() is called and executePhaseOnShard() fails hard with an exception.
// In this case calling onShardFailure() would overflow the operations counter, so the best we could do
// here is to fail the phase and move on to the next one.
if (totalOps.get() == expectedTotalOps) {
onPhaseFailure(this, "The phase has failed", e);
} else {
onShardFailure(shardIndex, shard, shardIt, e);
}
});
} finally {
executeNext(pendingExecutions, thread);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,57 @@ public void onFailure(Exception e) {
assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length));
}

public void testExecutePhaseOnShardFailure() throws InterruptedException {
final Index index = new Index("test", UUID.randomUUID().toString());

final SearchShardIterator[] shards = IntStream.range(0, 2 + randomInt(3))
.mapToObj(i -> new SearchShardIterator(null, new ShardId(index, i), List.of("n1", "n2", "n3"), null, null, null))
.toArray(SearchShardIterator[]::new);

final AtomicBoolean fail = new AtomicBoolean(true);
final CountDownLatch latch = new CountDownLatch(1);
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
searchRequest.setMaxConcurrentShardRequests(5);

final ArraySearchPhaseResults<SearchPhaseResult> queryResult = new ArraySearchPhaseResults<>(shards.length);
AbstractSearchAsyncAction<SearchPhaseResult> action = createAction(
searchRequest,
queryResult,
new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse response) {}

@Override
public void onFailure(Exception e) {
try {
// We end up here only when onPhaseDone() is called (causing NPE) and
// ending up in the onPhaseFailure() callback
if (fail.compareAndExchange(true, false)) {
assertThat(e, instanceOf(SearchPhaseExecutionException.class));
throw new RuntimeException("Simulated exception");
}
} finally {
executor.submit(() -> latch.countDown());
}
}
},
false,
false,
new AtomicLong(),
shards
);
action.run();
assertTrue(latch.await(1, TimeUnit.SECONDS));

InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, action.buildShardFailures(), null, null);
assertSame(searchResponse.getAggregations(), internalSearchResponse.aggregations());
assertSame(searchResponse.getSuggest(), internalSearchResponse.suggest());
assertSame(searchResponse.getProfileResults(), internalSearchResponse.profile());
assertSame(searchResponse.getHits(), internalSearchResponse.hits());
assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length));
}

private static final class PhaseResult extends SearchPhaseResult {
PhaseResult(ShardSearchContextId contextId) {
this.contextId = contextId;
Expand Down