Skip to content

Commit

Permalink
Wait for the listener to complete
Browse files Browse the repository at this point in the history
  • Loading branch information
smalyshev committed Dec 6, 2024
1 parent 188dedb commit afddc5f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ public ExchangeSourceHandler removeExchangeSourceHandler(String sessionId) {
public void finishSessionEarly(String sessionId, ActionListener<Void> listener) {
ExchangeSourceHandler exchangeSource = removeExchangeSourceHandler(sessionId);
if (exchangeSource != null) {
exchangeSource.finishEarly(true, listener);
exchangeSource.finishEarly(false, listener);
} else {
listener.onResponse(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.compute.EsqlRefCountingListener;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
Expand All @@ -36,6 +38,7 @@

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;

Expand Down Expand Up @@ -123,7 +126,15 @@ private void stopQueryAndReturnResult(Task task, AsyncExecutionId asyncId, Actio
} catch (IOException e) {
throw new ResourceNotFoundException(asyncId + " not found", e);
}
asyncListener.addListener(listener);
exchangeService.finishSessionEarly(sessionID(asyncId), ActionListener.noop());
// Here we will wait for both the response to become available and for the finish operation to complete
var responseHolder = new AtomicReference<EsqlQueryResponse>();
ActionListener<Void> resultListener = listener.delegateFailureIgnoreResponseAndWrap(l -> l.onResponse(responseHolder.get()));
try (var refs = new EsqlRefCountingListener(resultListener)) {
asyncListener.addListener(refs.acquire().map(r -> {
responseHolder.set(r);
return null;
}));
exchangeService.finishSessionEarly(sessionID(asyncId), refs.acquire());
}
}
}

0 comments on commit afddc5f

Please sign in to comment.