Skip to content

Commit

Permalink
Unblock driver when one of the operator finishes
Browse files Browse the repository at this point in the history
As described in #4065
highly selective limit queries might not finish when
required number of rows is produced. This is due to the
case described in prestodb/presto#9357.
This problem is solved by allowing LocalExchangeSinkOperator to
notify driver when operator becomes finished.
  • Loading branch information
sopel39 committed Aug 12, 2022
1 parent c1c7cff commit 33ad5ec
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 12 deletions.
5 changes: 5 additions & 0 deletions core/trino-main/src/main/java/io/trino/operator/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,11 @@ private ListenableFuture<Void> processInternal(OperationTimer operationTimer)
}

if (!blockedFutures.isEmpty()) {
// allow for operators to unblock drivers when they become finished
for (Operator operator : activeOperators) {
operator.getOperatorContext().getFinishedFuture().ifPresent(blockedFutures::add);
}

// unblock when the first future is complete
ListenableFuture<Void> blocked = firstFinishedFuture(blockedFutures);
// driver records serial blocked time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class OperatorContext
private final AtomicReference<SettableFuture<Void>> memoryFuture;
private final AtomicReference<SettableFuture<Void>> revocableMemoryFuture;
private final AtomicReference<BlockedMonitor> blockedMonitor = new AtomicReference<>();
private final AtomicReference<ListenableFuture<Void>> finishedFuture = new AtomicReference<>();
private final AtomicLong blockedWallNanos = new AtomicLong();

private final OperationTiming finishTiming = new OperationTiming();
Expand Down Expand Up @@ -241,6 +242,16 @@ public void setLatestConnectorMetrics(Metrics metrics)
this.connectorMetrics.set(metrics);
}

Optional<ListenableFuture<Void>> getFinishedFuture()
{
return Optional.ofNullable(finishedFuture.get());
}

public void setFinishedFuture(ListenableFuture<Void> finishedFuture)
{
checkState(this.finishedFuture.getAndSet(requireNonNull(finishedFuture, "finishedFuture is null")) == null, "finishedFuture already set");
}

public void recordPhysicalWrittenData(long sizeInBytes)
{
physicalWrittenDataSize.getAndAdd(sizeInBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
package io.trino.operator.exchange;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.trino.spi.Page;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static io.trino.operator.Operator.NOT_BLOCKED;
Expand All @@ -35,7 +35,7 @@ public static LocalExchangeSink finishedLocalExchangeSink()
private final LocalExchanger exchanger;
private final Consumer<LocalExchangeSink> onFinish;

private final AtomicBoolean finished = new AtomicBoolean();
private final SettableFuture<Void> finished = SettableFuture.create();

public LocalExchangeSink(
LocalExchanger exchanger,
Expand All @@ -47,15 +47,15 @@ public LocalExchangeSink(

public void finish()
{
if (finished.compareAndSet(false, true)) {
if (finished.set(null)) {
exchanger.finish();
onFinish.accept(this);
}
}

public boolean isFinished()
public ListenableFuture<Void> isFinished()
{
return finished.get();
return finished;
}

public void addPage(Page page)
Expand All @@ -65,7 +65,7 @@ public void addPage(Page page)
// ignore pages after finished
// this can happen with limit queries when all of the source (readers) are closed, so sinks
// can be aborted early
if (isFinished()) {
if (isFinished().isDone()) {
return;
}

Expand All @@ -76,7 +76,7 @@ public void addPage(Page page)

public ListenableFuture<Void> waitForWriting()
{
if (isFinished()) {
if (isFinished().isDone()) {
return NOT_BLOCKED;
}
return exchanger.waitForWriting();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void localPlannerComplete()
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.sink = requireNonNull(sink, "sink is null");
this.pagePreprocessor = requireNonNull(pagePreprocessor, "pagePreprocessor is null");
operatorContext.setFinishedFuture(sink.isFinished());
}

@Override
Expand All @@ -106,7 +107,7 @@ public void finish()
@Override
public boolean isFinished()
{
return sink.isFinished();
return sink.isFinished().isDone();
}

@Override
Expand Down
71 changes: 71 additions & 0 deletions core/trino-main/src/test/java/io/trino/operator/TestDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
Expand Down Expand Up @@ -254,6 +255,30 @@ public void testMemoryRevocationRace()
assertTrue(driver.processForDuration(new Duration(100, TimeUnit.MILLISECONDS)).isDone());
}

@Test
public void testUnblocksOnFinish()
{
List<Type> types = ImmutableList.of(VARCHAR, BIGINT, BIGINT);
TableScanOperator source = new AlwaysBlockedTableScanOperator(
driverContext.addOperatorContext(99, new PlanNodeId("test"), "scan"),
new PlanNodeId("source"),
(session, split, table, columns, dynamicFilter) -> new FixedPageSource(rowPagesBuilder(types)
.addSequencePage(10, 20, 30, 40)
.build()),
TEST_TABLE_HANDLE,
ImmutableList.of());

MaterializedResult.Builder resultBuilder = MaterializedResult.resultBuilder(driverContext.getSession(), types);
BlockedSinkOperator sink = new BlockedSinkOperator(driverContext.addOperatorContext(1, new PlanNodeId("test"), "sink"), resultBuilder::page, Function.identity());
Driver driver = Driver.createDriver(driverContext, source, sink);

ListenableFuture<Void> blocked = driver.processForDuration(new Duration(100, TimeUnit.MILLISECONDS));
assertFalse(blocked.isDone());

sink.setFinished();
assertTrue(blocked.isDone());
}

@Test
public void testBrokenOperatorAddSource()
{
Expand Down Expand Up @@ -418,6 +443,52 @@ public void close()
}
}

private static class BlockedSinkOperator
extends PageConsumerOperator
{
private final SettableFuture<Void> finished = SettableFuture.create();

public BlockedSinkOperator(
OperatorContext operatorContext,
Consumer<Page> pageConsumer,
Function<Page, Page> pagePreprocessor)
{
super(operatorContext, pageConsumer, pagePreprocessor);
operatorContext.setFinishedFuture(finished);
}

@Override
public boolean isFinished()
{
return finished.isDone();
}

void setFinished()
{
finished.set(null);
}
}

private static class AlwaysBlockedTableScanOperator
extends TableScanOperator
{
public AlwaysBlockedTableScanOperator(
OperatorContext operatorContext,
PlanNodeId planNodeId,
PageSourceProvider pageSourceProvider,
TableHandle table,
Iterable<ColumnHandle> columns)
{
super(operatorContext, planNodeId, pageSourceProvider, table, columns, DynamicFilter.EMPTY);
}

@Override
public ListenableFuture<Void> isBlocked()
{
return SettableFuture.create();
}
}

private static class AlwaysBlockedMemoryRevokingTableScanOperator
extends TableScanOperator
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,14 @@ public void writeUnblockWhenAllReadersFinishAndPagesConsumed()
sinkFactory.noMoreSinkFactories();
LocalExchangeSink sinkA = sinkFactory.createSink();
assertSinkCanWrite(sinkA);
ListenableFuture<Void> sinkAFinished = sinkA.isFinished();
assertFalse(sinkAFinished.isDone());

LocalExchangeSink sinkB = sinkFactory.createSink();
assertSinkCanWrite(sinkB);
ListenableFuture<Void> sinkBFinished = sinkB.isFinished();
assertFalse(sinkBFinished.isDone());

sinkFactory.close();

LocalExchangeSource sourceA = exchange.getSource(0);
Expand Down Expand Up @@ -620,6 +626,8 @@ public void writeUnblockWhenAllReadersFinishAndPagesConsumed()

assertTrue(sinkAFuture.isDone());
assertTrue(sinkBFuture.isDone());
assertTrue(sinkAFinished.isDone());
assertTrue(sinkBFinished.isDone());

assertSinkFinished(sinkA);
assertSinkFinished(sinkB);
Expand Down Expand Up @@ -692,26 +700,26 @@ private static void assertPartitionedRemovePage(LocalExchangeSource source, int

private static void assertSinkCanWrite(LocalExchangeSink sink)
{
assertFalse(sink.isFinished());
assertFalse(sink.isFinished().isDone());
assertTrue(sink.waitForWriting().isDone());
}

private static ListenableFuture<Void> assertSinkWriteBlocked(LocalExchangeSink sink)
{
assertFalse(sink.isFinished());
assertFalse(sink.isFinished().isDone());
ListenableFuture<Void> writeFuture = sink.waitForWriting();
assertFalse(writeFuture.isDone());
return writeFuture;
}

private static void assertSinkFinished(LocalExchangeSink sink)
{
assertTrue(sink.isFinished());
assertTrue(sink.isFinished().isDone());
assertTrue(sink.waitForWriting().isDone());

// this will be ignored
sink.addPage(createPage(0));
assertTrue(sink.isFinished());
assertTrue(sink.isFinished().isDone());
assertTrue(sink.waitForWriting().isDone());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.testing.FaultTolerantExecutionConnectorTestHelper;
import io.trino.testing.QueryRunner;
import io.trino.tpch.TpchTable;
import org.testng.annotations.Test;

import static io.airlift.testing.Closeables.closeAllSuppress;

Expand Down Expand Up @@ -58,4 +59,11 @@ protected QueryRunner createQueryRunner()
}
return queryRunner;
}

@Override
@Test(enabled = false)
public void testSelectiveLimit()
{
// FTE mode does not terminate query when limit is reached
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,16 @@ public void testQueryTransitionsToRunningState()

assertThatThrownBy(queryFuture::get).hasMessageContaining("Query was canceled");
}

@Test(timeOut = 30_000)
public void testSelectiveLimit()
{
assertQuery("" +
"SELECT * FROM (" +
" (SELECT orderkey AS a FROM tpch.sf10000.orders WHERE orderkey=-1)" +
" UNION ALL SELECT * FROM (values -1) AS t(a))" +
"WHERE a=-1 " +
"LIMIT 1",
"VALUES -1");
}
}

0 comments on commit 33ad5ec

Please sign in to comment.