Skip to content

Commit

Permalink
Transition task to FINISHED after noMoreOperators is set
Browse files Browse the repository at this point in the history
Currently dynamic filters are delivered via DynamicFilterFetcher.
When a TaskStatus contains an updated dynamic filter version the
DynamicFilterFetcher is responsible for fetching the new dynamic
filters from a worker. However tasks were getting transitioned
to FINISHED before the LocalDynamicFilterConsumer#setPartitionCount
is set and dynamic filters updated. When task is transitioned
to FINISHED the TaskStatusFetcher no longer tries to update TaskStatus
so the coordinator is not able to learn that there is dynamic filters to
be fetched.
  • Loading branch information
arhimondr committed Sep 7, 2022
1 parent 04de92c commit 24290a0
Showing 1 changed file with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static io.trino.SystemSessionProperties.getInitialSplitsPerNode;
import static io.trino.SystemSessionProperties.getMaxDriversPerTask;
Expand Down Expand Up @@ -384,6 +385,7 @@ private void scheduleDriversForTaskLifeCycle()
driverRunnerFactory.noMoreDriverRunner();
verify(driverRunnerFactory.isNoMoreDriverRunner());
}
checkTaskCompletion();
}

private synchronized void enqueueDriverSplitRunner(boolean forceRunSplit, List<DriverSplitRunner> runners)
Expand Down Expand Up @@ -469,9 +471,9 @@ private synchronized void checkTaskCompletion()
return;
}

// are there more partition splits expected?
for (DriverSplitRunnerFactory driverSplitRunnerFactory : driverRunnerFactoriesWithSplitLifeCycle.values()) {
if (!driverSplitRunnerFactory.isNoMoreDriverRunner()) {
// are there more drivers expected?
for (DriverSplitRunnerFactory driverSplitRunnerFactory : concat(driverRunnerFactoriesWithTaskLifeCycle, driverRunnerFactoriesWithSplitLifeCycle.values())) {
if (!driverSplitRunnerFactory.isNoMoreDrivers()) {
return;
}
}
Expand Down Expand Up @@ -661,6 +663,11 @@ public void closeDriverFactoryIfFullyCreated()
}
}

public boolean isNoMoreDrivers()
{
return driverFactory.isNoMoreDrivers();
}

public OptionalInt getDriverInstances()
{
return driverFactory.getDriverInstances();
Expand Down

0 comments on commit 24290a0

Please sign in to comment.