Skip to content

Commit

Permalink
Use cancel reason in ESQL (elastic#103824)
Browse files Browse the repository at this point in the history
ComputeService in ESQL should use the cancel reason
when cancelling its exchanges.

Closes elastic#103746
  • Loading branch information
dnhatn authored Jan 2, 2024
1 parent 05c79cc commit 9a7c624
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ private void createRemoteIndex(int numDocs) throws Exception {
bulk.get();
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/103746")
public void testCancel() throws Exception {
createRemoteIndex(between(10, 100));
EsqlQueryRequest request = new EsqlQueryRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,9 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
final var parentTask = (CancellableTask) task;
final var sessionId = request.sessionId();
final var exchangeSink = exchangeService.getSinkHandler(sessionId);
parentTask.addListener(() -> exchangeService.finishSinkHandler(sessionId, new TaskCancelledException("task cancelled")));
parentTask.addListener(
() -> exchangeService.finishSinkHandler(sessionId, new TaskCancelledException(parentTask.getReasonCancelled()))
);
final ActionListener<ComputeResponse> listener = new ChannelActionListener<>(channel);
final EsqlConfiguration configuration = request.configuration();
acquireSearchContexts(
Expand Down Expand Up @@ -651,7 +653,9 @@ void runComputeOnRemoteCluster(
ActionListener<ComputeResponse> listener
) {
final var exchangeSink = exchangeService.getSinkHandler(globalSessionId);
parentTask.addListener(() -> exchangeService.finishSinkHandler(globalSessionId, new TaskCancelledException("request cancelled")));
parentTask.addListener(
() -> exchangeService.finishSinkHandler(globalSessionId, new TaskCancelledException(parentTask.getReasonCancelled()))
);
ThreadPool threadPool = transportService.getThreadPool();
final var responseHeadersCollector = new ResponseHeadersCollector(threadPool.getThreadContext());
listener = ActionListener.runBefore(listener, responseHeadersCollector::finish);
Expand Down

0 comments on commit 9a7c624

Please sign in to comment.