Skip to content

Commit

Permalink
Merge branch 'pr/118211' into partial-result-on-demand
Browse files Browse the repository at this point in the history
  • Loading branch information
smalyshev committed Dec 9, 2024
2 parents 516abbe + 0ce939b commit b53e1f1
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,14 @@ SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplie
long nextStatus = startTime + statusNanos;
int iter = 0;
while (true) {
IsBlockedResult isBlocked = runSingleLoopIteration();
IsBlockedResult isBlocked;
try {
isBlocked = runSingleLoopIteration();
} catch (DriverEarlyTerminationException e) {
drainAndCloseOperators(null);
updateStatus(finishNanos - startTime, iter, DriverStatus.Status.DONE, "early termination");
return Operator.NOT_BLOCKED.listener();
}
iter++;
if (isBlocked.listener().isDone() == false) {
updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC, isBlocked.reason());
Expand Down Expand Up @@ -227,6 +234,17 @@ public void close() {
drainAndCloseOperators(null);
}

private void checkForEarlyTermination() throws DriverEarlyTerminationException {
if (activeOperators.size() >= 2 && activeOperators.getLast().isFinished()) {
for (int i = activeOperators.size() - 2; i >= 0; i--) {
Operator op = activeOperators.get(i);
if (op.isFinished() == false) {
throw new DriverEarlyTerminationException();
}
}
}
}

/**
* Abort the driver and wait for it to finish
*/
Expand All @@ -253,6 +271,7 @@ private IsBlockedResult runSingleLoopIteration() {
if (op.isBlocked().listener().isDone() == false) {
continue;
}
checkForEarlyTermination();

if (op.isFinished() == false && nextOp.needsInput()) {
Page page = op.getOutput();
Expand All @@ -263,6 +282,15 @@ private IsBlockedResult runSingleLoopIteration() {
page.releaseBlocks();
} else {
// Non-empty result from the previous operation, move it to the next operation
boolean terminated = true;
try {
checkForEarlyTermination();
terminated = false;
} finally {
if (terminated) {
page.releaseBlocks();
}
}
nextOp.addInput(page);
movedPage = true;
}
Expand Down Expand Up @@ -290,6 +318,7 @@ private IsBlockedResult runSingleLoopIteration() {
itr.remove();
}

checkForEarlyTermination();
// Finish the next operator, which is now the first operator.
if (activeOperators.isEmpty() == false) {
Operator newRootOperator = activeOperators.get(0);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

final class DriverEarlyTerminationException extends RuntimeException {
DriverEarlyTerminationException() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ private void assertCancelled(ActionFuture<EsqlQueryResponse> response) throws Ex
"task cancelled",
"request cancelled test cancel",
"parent task was cancelled [test cancel]",
"cancelled on failure"
"cancelled on failure",
"task cancelled [cancelled on failure]"
)
)
);
Expand Down

0 comments on commit b53e1f1

Please sign in to comment.