From 0ce939bf5cc28ddcbd78c878c1b8bd2d41960e2c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 7 Dec 2024 10:20:09 -0800 Subject: [PATCH] Allow early termination in Driver --- .../compute/operator/Driver.java | 31 ++++++++++++++++++- .../DriverEarlyTerminationException.java | 12 +++++++ .../xpack/esql/action/EsqlActionTaskIT.java | 3 +- 3 files changed, 44 insertions(+), 2 deletions(-) create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverEarlyTerminationException.java diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index acbf8a17b31fd..03193139e175d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -186,7 +186,14 @@ SubscribableListener run(TimeValue maxTime, int maxIterations, LongSupplie long nextStatus = startTime + statusNanos; int iter = 0; while (true) { - IsBlockedResult isBlocked = runSingleLoopIteration(); + IsBlockedResult isBlocked; + try { + isBlocked = runSingleLoopIteration(); + } catch (DriverEarlyTerminationException e) { + drainAndCloseOperators(null); + updateStatus(finishNanos - startTime, iter, DriverStatus.Status.DONE, "early termination"); + return Operator.NOT_BLOCKED.listener(); + } iter++; if (isBlocked.listener().isDone() == false) { updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC, isBlocked.reason()); @@ -227,6 +234,17 @@ public void close() { drainAndCloseOperators(null); } + private void checkForEarlyTermination() throws DriverEarlyTerminationException { + if (activeOperators.size() >= 2 && activeOperators.getLast().isFinished()) { + for (int i = activeOperators.size() - 2; i >= 0; i--) { + Operator op = activeOperators.get(i); + if (op.isFinished() == false) { + throw new DriverEarlyTerminationException(); + } + } + } + } + /** * Abort the driver and wait for it to finish */ @@ -253,6 +271,7 @@ private IsBlockedResult runSingleLoopIteration() { if (op.isBlocked().listener().isDone() == false) { continue; } + checkForEarlyTermination(); if (op.isFinished() == false && nextOp.needsInput()) { Page page = op.getOutput(); @@ -263,6 +282,15 @@ private IsBlockedResult runSingleLoopIteration() { page.releaseBlocks(); } else { // Non-empty result from the previous operation, move it to the next operation + boolean terminated = true; + try { + checkForEarlyTermination(); + terminated = false; + } finally { + if (terminated) { + page.releaseBlocks(); + } + } nextOp.addInput(page); movedPage = true; } @@ -290,6 +318,7 @@ private IsBlockedResult runSingleLoopIteration() { itr.remove(); } + checkForEarlyTermination(); // Finish the next operator, which is now the first operator. if (activeOperators.isEmpty() == false) { Operator newRootOperator = activeOperators.get(0); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverEarlyTerminationException.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverEarlyTerminationException.java new file mode 100644 index 0000000000000..dfa5fd3d0f246 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverEarlyTerminationException.java @@ -0,0 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator; + +final class DriverEarlyTerminationException extends RuntimeException { + DriverEarlyTerminationException() {} +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index abd4f6b49d7b4..208f50467c9d0 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -345,7 +345,8 @@ private void assertCancelled(ActionFuture response) throws Ex "task cancelled", "request cancelled test cancel", "parent task was cancelled [test cancel]", - "cancelled on failure" + "cancelled on failure", + "task cancelled [cancelled on failure]" ) ) );