Skip to content

Commit

Permalink
Fix testCancelRequestWhenFailingFetchingPages (elastic#106447)
Browse files Browse the repository at this point in the history
If we proceed without waiting for pages, we might cancel the main 
request before starting the data-node request. As a result, the exchange
sinks on data-nodes won't be removed until the inactive_timeout elapses,
which is longer than the assertBusy timeout.

Closes elastic#106443
  • Loading branch information
dnhatn authored Mar 19, 2024
1 parent edbff94 commit 8cbece4
Showing 1 changed file with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ private void assertCancelled(ActionFuture<EsqlQueryResponse> response) throws Ex
* Ensure that when some exchange requests fail, we cancel the ESQL request, and complete all
* exchange sinks with the failure, despite having outstanding pages in the buffer.
*/
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/106443")
public void testCancelRequestWhenFailingFetchingPages() throws Exception {
String coordinator = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
String dataNode = internalCluster().startDataOnlyNode();
Expand Down Expand Up @@ -368,6 +367,9 @@ protected void doRun() throws Exception {
request.pragmas(randomPragmas());
PlainActionFuture<EsqlQueryResponse> future = new PlainActionFuture<>();
client.execute(EsqlQueryAction.INSTANCE, request, future);
ExchangeService exchangeService = internalCluster().getInstance(ExchangeService.class, dataNode);
boolean waitedForPages;
final String sessionId;
try {
List<TaskInfo> foundTasks = new ArrayList<>();
assertBusy(() -> {
Expand All @@ -381,19 +383,25 @@ protected void doRun() throws Exception {
assertThat(tasks, hasSize(1));
foundTasks.addAll(tasks);
});
String sessionId = foundTasks.get(0).taskId().toString();
ExchangeService exchangeService = internalCluster().getInstance(ExchangeService.class, dataNode);
sessionId = foundTasks.get(0).taskId().toString();
assertTrue(fetchingStarted.await(1, TimeUnit.MINUTES));
ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(sessionId);
if (randomBoolean()) {
// do not fail exchange requests when we have some pages
waitedForPages = randomBoolean();
if (waitedForPages) {
// do not fail exchange requests until we have some pages
assertBusy(() -> assertThat(exchangeSink.bufferSize(), greaterThan(0)));
}
} finally {
allowedFetching.countDown();
}
Exception failure = expectThrows(Exception.class, () -> future.actionGet().close());
assertThat(failure.getMessage(), containsString("failed to fetch pages"));
// If we proceed without waiting for pages, we might cancel the main request before starting the data-node request.
// As a result, the exchange sinks on data-nodes won't be removed until the inactive_timeout elapses, which is
// longer than the assertBusy timeout.
if (waitedForPages == false) {
exchangeService.finishSinkHandler(sessionId, failure);
}
} finally {
transportService.clearAllRules();
}
Expand Down

0 comments on commit 8cbece4

Please sign in to comment.