Skip to content

Commit

Permalink
Capture partial status
Browse files Browse the repository at this point in the history
  • Loading branch information
smalyshev committed Dec 9, 2024
1 parent 124e070 commit 516abbe
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public final class ExchangeSourceHandler {

private final AtomicInteger nextSinkId = new AtomicInteger();
private final Map<Integer, RemoteSink> remoteSinks = ConcurrentCollections.newConcurrentMap();
private Runnable finishEarlyHandler;

/**
* Creates a new ExchangeSourceHandler.
Expand Down Expand Up @@ -83,6 +84,10 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
}));
}

public void onFinishEarly(Runnable finishEarlyHandler) {
this.finishEarlyHandler = finishEarlyHandler;
}

private class ExchangeSourceImpl implements ExchangeSource {
private boolean finished;

Expand Down Expand Up @@ -311,6 +316,9 @@ public Releasable addEmptySink() {
* @param drainingPages whether to discard pages already fetched in the exchange
*/
public void finishEarly(boolean drainingPages, ActionListener<Void> listener) {
if (finishEarlyHandler != null) {
finishEarlyHandler.run();
}
buffer.finish(drainingPages);
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(listener)) {
for (RemoteSink remoteSink : remoteSinks.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
public static final ParseField FAILED_FIELD = new ParseField("failed");
public static final ParseField DETAILS_FIELD = new ParseField("details");
public static final ParseField TOOK = new ParseField("took");
public static final ParseField IS_PARTIAL_FIELD = new ParseField("is_partial");

// Map key is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query
// The Map itself is immutable after construction - all Clusters will be accounted for at the start of the search.
Expand Down Expand Up @@ -241,6 +242,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
b.field(SKIPPED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.SKIPPED));
b.field(PARTIAL_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.PARTIAL));
b.field(FAILED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.FAILED));
b.field(IS_PARTIAL_FIELD.getPreferredName(), isPartial);
// each Cluster object defines its own field object name
b.xContentObject("details", clusterInfo.values().iterator());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ public void execute(
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH),
ActionListener.runBefore(computeListener.acquireAvoid(), () -> exchangeService.removeExchangeSourceHandler(sessionId))
);
exchangeSource.onFinishEarly(() -> {
execInfo.setPartial();
});
exchangeService.addExchangeSourceHandler(sessionId, exchangeSource);
try (Releasable ignored = exchangeSource.addEmptySink()) {
// run compute on the coordinator
Expand Down

0 comments on commit 516abbe

Please sign in to comment.