From 516abbe81f08c2f386bce978135779bb183a74d3 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 9 Dec 2024 10:23:22 -0800 Subject: [PATCH] Capture partial status --- .../compute/operator/exchange/ExchangeSourceHandler.java | 8 ++++++++ .../xpack/esql/action/EsqlExecutionInfo.java | 2 ++ .../elasticsearch/xpack/esql/plugin/ComputeService.java | 3 +++ 3 files changed, 13 insertions(+) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java index aa722695b841e..ee442a3fb240b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java @@ -45,6 +45,7 @@ public final class ExchangeSourceHandler { private final AtomicInteger nextSinkId = new AtomicInteger(); private final Map remoteSinks = ConcurrentCollections.newConcurrentMap(); + private Runnable finishEarlyHandler; /** * Creates a new ExchangeSourceHandler. @@ -83,6 +84,10 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi })); } + public void onFinishEarly(Runnable finishEarlyHandler) { + this.finishEarlyHandler = finishEarlyHandler; + } + private class ExchangeSourceImpl implements ExchangeSource { private boolean finished; @@ -311,6 +316,9 @@ public Releasable addEmptySink() { * @param drainingPages whether to discard pages already fetched in the exchange */ public void finishEarly(boolean drainingPages, ActionListener listener) { + if (finishEarlyHandler != null) { + finishEarlyHandler.run(); + } buffer.finish(drainingPages); try (EsqlRefCountingListener refs = new EsqlRefCountingListener(listener)) { for (RemoteSink remoteSink : remoteSinks.values()) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 8ad668a455575..9bedbac7ee45b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -56,6 +56,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { public static final ParseField FAILED_FIELD = new ParseField("failed"); public static final ParseField DETAILS_FIELD = new ParseField("details"); public static final ParseField TOOK = new ParseField("took"); + public static final ParseField IS_PARTIAL_FIELD = new ParseField("is_partial"); // Map key is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query // The Map itself is immutable after construction - all Clusters will be accounted for at the start of the search. @@ -241,6 +242,7 @@ public Iterator toXContentChunked(ToXContent.Params params b.field(SKIPPED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.SKIPPED)); b.field(PARTIAL_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.PARTIAL)); b.field(FAILED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.FAILED)); + b.field(IS_PARTIAL_FIELD.getPreferredName(), isPartial); // each Cluster object defines its own field object name b.xContentObject("details", clusterInfo.values().iterator()); }); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 258160d4f6d08..2777007183919 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -224,6 +224,9 @@ public void execute( transportService.getThreadPool().executor(ThreadPool.Names.SEARCH), ActionListener.runBefore(computeListener.acquireAvoid(), () -> exchangeService.removeExchangeSourceHandler(sessionId)) ); + exchangeSource.onFinishEarly(() -> { + execInfo.setPartial(); + }); exchangeService.addExchangeSourceHandler(sessionId, exchangeSource); try (Releasable ignored = exchangeSource.addEmptySink()) { // run compute on the coordinator