Skip to content

Commit

Permalink
Allow early termination of exchange source (elastic#118129)
Browse files Browse the repository at this point in the history
This change introduces the ability to gracefully terminate the exchange 
source early by instructing all remote exchange sinks to stop their
computations.

1. When sufficient data has been accumulated (e.g., reaching the LIMIT), 
the exchange source signals remote sinks to stop generating new pages,
allowing the query to finish sooner.

2. When users request immediate results, even if they are partial, 
incomplete, or potentially inaccurate.
  • Loading branch information
dnhatn authored Dec 6, 2024
1 parent c580024 commit a04d671
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -23,6 +24,7 @@
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
Expand All @@ -40,10 +42,11 @@

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* {@link ExchangeService} is responsible for exchanging pages between exchange sinks and sources on the same or different nodes.
Expand Down Expand Up @@ -293,7 +296,7 @@ static final class TransportRemoteSink implements RemoteSink {
final Executor responseExecutor;

final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0L);
final AtomicBoolean finished = new AtomicBoolean(false);
final AtomicReference<SubscribableListener<Void>> completionListenerRef = new AtomicReference<>(null);

TransportRemoteSink(
TransportService transportService,
Expand All @@ -318,13 +321,14 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeRe
return;
}
// already finished
if (finished.get()) {
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
SubscribableListener<Void> completionListener = completionListenerRef.get();
if (completionListener != null) {
completionListener.addListener(listener.map(unused -> new ExchangeResponse(blockFactory, null, true)));
return;
}
doFetchPageAsync(false, ActionListener.wrap(r -> {
if (r.finished()) {
finished.set(true);
completionListenerRef.compareAndSet(null, SubscribableListener.newSucceeded(null));
}
listener.onResponse(r);
}, e -> close(ActionListener.running(() -> listener.onFailure(e)))));
Expand Down Expand Up @@ -356,10 +360,19 @@ private void doFetchPageAsync(boolean allSourcesFinished, ActionListener<Exchang

@Override
public void close(ActionListener<Void> listener) {
if (finished.compareAndSet(false, true)) {
doFetchPageAsync(true, listener.delegateFailure((l, unused) -> l.onResponse(null)));
} else {
listener.onResponse(null);
final SubscribableListener<Void> candidate = new SubscribableListener<>();
final SubscribableListener<Void> actual = completionListenerRef.updateAndGet(
curr -> Objects.requireNonNullElse(curr, candidate)
);
actual.addListener(listener);
if (candidate == actual) {
doFetchPageAsync(true, ActionListener.wrap(r -> {
final Page page = r.takePage();
if (page != null) {
page.releaseBlocks();
}
candidate.onResponse(null);
}, e -> candidate.onResponse(null)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.compute.EsqlRefCountingListener;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.FailureCollector;
import org.elasticsearch.compute.operator.IsBlockedResult;
import org.elasticsearch.core.Releasable;

import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -41,6 +43,9 @@ public final class ExchangeSourceHandler {
// The final failure collected will be notified to callers via the {@code completionListener}.
private final FailureCollector failure = new FailureCollector();

private final AtomicInteger nextSinkId = new AtomicInteger();
private final Map<Integer, RemoteSink> remoteSinks = ConcurrentCollections.newConcurrentMap();

/**
* Creates a new ExchangeSourceHandler.
*
Expand All @@ -53,7 +58,9 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
this.buffer = new ExchangeBuffer(maxBufferSize);
this.fetchExecutor = fetchExecutor;
this.outstandingSinks = new PendingInstances(() -> buffer.finish(false));
this.outstandingSources = new PendingInstances(() -> buffer.finish(true));
final PendingInstances closingSinks = new PendingInstances(() -> {});
closingSinks.trackNewInstance();
this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.running(closingSinks::finishInstance)));
buffer.addCompletionListener(ActionListener.running(() -> {
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener);
try (RefCountingRunnable refs = new RefCountingRunnable(() -> {
Expand All @@ -64,6 +71,7 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
listener.onResponse(null);
}
})) {
closingSinks.completion.addListener(refs.acquireListener());
for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) {
// Create an outstanding instance and then finish to complete the completionListener
// if we haven't registered any instances of exchange sinks or exchange sources before.
Expand Down Expand Up @@ -257,7 +265,11 @@ void onSinkComplete() {
* @see ExchangeSinkHandler#fetchPageAsync(boolean, ActionListener)
*/
public void addRemoteSink(RemoteSink remoteSink, boolean failFast, int instances, ActionListener<Void> listener) {
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(ActionListener.notifyOnce(listener));
final int sinkId = nextSinkId.incrementAndGet();
remoteSinks.put(sinkId, remoteSink);
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(
ActionListener.notifyOnce(ActionListener.runBefore(listener, () -> remoteSinks.remove(sinkId)))
);
fetchExecutor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
Expand Down Expand Up @@ -291,6 +303,22 @@ public Releasable addEmptySink() {
return outstandingSinks::finishInstance;
}

/**
* Gracefully terminates the exchange source early by instructing all remote exchange sinks to stop their computations.
* This can happen when the exchange source has accumulated enough data (e.g., reaching the LIMIT) or when users want to
* see the current result immediately.
*
* @param drainingPages whether to discard pages already fetched in the exchange
*/
public void finishEarly(boolean drainingPages, ActionListener<Void> listener) {
buffer.finish(drainingPages);
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(listener)) {
for (RemoteSink remoteSink : remoteSinks.values()) {
remoteSink.close(refs.acquire());
}
}
}

private static class PendingInstances {
private final AtomicInteger instances = new AtomicInteger();
private final SubscribableListener<Void> completion = new SubscribableListener<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@
package org.elasticsearch.compute.operator.exchange;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.compute.data.Page;

public interface RemoteSink {

void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener);

default void close(ActionListener<Void> listener) {
fetchPageAsync(true, listener.delegateFailure((l, r) -> {
try {
r.close();
} finally {
l.onResponse(null);
final Page page = r.takePage();
if (page != null) {
page.releaseBlocks();
}
l.onResponse(null);
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -421,7 +423,7 @@ public void testExchangeSourceContinueOnFailure() {
}
}

public void testEarlyTerminate() {
public void testClosingSinks() {
BlockFactory blockFactory = blockFactory();
IntBlock block1 = blockFactory.newConstantIntBlockWith(1, 2);
IntBlock block2 = blockFactory.newConstantIntBlockWith(1, 2);
Expand All @@ -441,6 +443,57 @@ public void testEarlyTerminate() {
assertTrue(sink.isFinished());
}

public void testFinishEarly() throws Exception {
ExchangeSourceHandler sourceHandler = new ExchangeSourceHandler(20, threadPool.generic(), ActionListener.noop());
Semaphore permits = new Semaphore(between(1, 5));
BlockFactory blockFactory = blockFactory();
Queue<Page> pages = ConcurrentCollections.newQueue();
ExchangeSource exchangeSource = sourceHandler.createExchangeSource();
AtomicBoolean sinkClosed = new AtomicBoolean();
PlainActionFuture<Void> sinkCompleted = new PlainActionFuture<>();
sourceHandler.addRemoteSink((allSourcesFinished, listener) -> {
if (allSourcesFinished) {
sinkClosed.set(true);
permits.release(10);
listener.onResponse(new ExchangeResponse(blockFactory, null, sinkClosed.get()));
} else {
try {
if (permits.tryAcquire(between(0, 100), TimeUnit.MICROSECONDS)) {
boolean closed = sinkClosed.get();
final Page page;
if (closed) {
page = new Page(blockFactory.newConstantIntBlockWith(1, 1));
pages.add(page);
} else {
page = null;
}
listener.onResponse(new ExchangeResponse(blockFactory, page, closed));
} else {
listener.onResponse(new ExchangeResponse(blockFactory, null, sinkClosed.get()));
}
} catch (Exception e) {
throw new AssertionError(e);
}
}
}, false, between(1, 3), sinkCompleted);
threadPool.schedule(
() -> sourceHandler.finishEarly(randomBoolean(), ActionListener.noop()),
TimeValue.timeValueMillis(between(0, 10)),
threadPool.generic()
);
sinkCompleted.actionGet();
Page p;
while ((p = exchangeSource.pollPage()) != null) {
assertSame(p, pages.poll());
p.releaseBlocks();
}
while ((p = pages.poll()) != null) {
p.releaseBlocks();
}
assertTrue(exchangeSource.isFinished());
exchangeSource.finish();
}

public void testConcurrentWithTransportActions() {
MockTransportService node0 = newTransportService();
ExchangeService exchange0 = new ExchangeService(Settings.EMPTY, threadPool, ESQL_TEST_EXECUTOR, blockFactory());
Expand Down

0 comments on commit a04d671

Please sign in to comment.