Skip to content

Commit

Permalink
Simplify addDriverFactory params
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Jul 14, 2023
1 parent 59d4859 commit fb75256
Showing 1 changed file with 11 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,6 @@ public LocalExecutionPlan plan(
.collect(toImmutableList());

context.addDriverFactory(
context.isInputDriver(),
true,
new PhysicalOperation(
outputOperatorFactory.createOutputOperator(
Expand All @@ -646,7 +645,7 @@ public LocalExecutionPlan plan(
pagePreprocessor,
new PagesSerdeFactory(plannerContext.getBlockEncodingSerde(), isExchangeCompressionEnabled(session))),
physicalOperation),
context.getDriverInstanceCount());
context);

// notify operator factories that planning has completed
context.getDriverFactories().stream()
Expand Down Expand Up @@ -697,8 +696,10 @@ private LocalExecutionPlanContext(
this.nextPipelineId = nextPipelineId;
}

public void addDriverFactory(boolean inputDriver, boolean outputDriver, PhysicalOperation physicalOperation, OptionalInt driverInstances)
public void addDriverFactory(boolean outputDriver, PhysicalOperation physicalOperation, LocalExecutionPlanContext context)
{
boolean inputDriver = context.isInputDriver();
OptionalInt driverInstances = context.getDriverInstanceCount();
List<OperatorFactoryWithTypes> operatorFactoriesWithTypes = physicalOperation.getOperatorFactoriesWithTypes();
addLookupOuterDrivers(outputDriver, toOperatorFactories(operatorFactoriesWithTypes));
List<OperatorFactory> operatorFactories;
Expand Down Expand Up @@ -2663,10 +2664,9 @@ private PhysicalOperation createNestedLoopJoin(JoinNode node, Set<DynamicFilterI
}

context.addDriverFactory(
buildContext.isInputDriver(),
false,
new PhysicalOperation(nestedLoopBuildOperatorFactory, buildSource),
buildContext.getDriverInstanceCount());
buildContext);

// build output mapping
ImmutableMap.Builder<Symbol, Integer> outputMappings = ImmutableMap.builder();
Expand Down Expand Up @@ -2794,10 +2794,9 @@ private PagesSpatialIndexFactory createPagesSpatialIndexFactory(
pagesIndexFactory);

context.addDriverFactory(
buildContext.isInputDriver(),
false,
new PhysicalOperation(builderOperatorFactory, buildSource),
buildContext.getDriverInstanceCount());
buildContext);

return builderOperatorFactory.getPagesSpatialIndexFactory();
}
Expand Down Expand Up @@ -2942,10 +2941,9 @@ private PhysicalOperation createLookupJoin(
taskConcurrency / partitionCount));

context.addDriverFactory(
buildContext.isInputDriver(),
false,
new PhysicalOperation(hashBuilderOperatorFactory, buildSource),
buildContext.getDriverInstanceCount());
buildContext);

JoinOperatorType joinType = JoinOperatorType.ofJoinNodeType(node.getType(), outputSingleMatch, waitForBuild);
operator = operatorFactories.spillingJoin(
Expand Down Expand Up @@ -2995,10 +2993,9 @@ private PhysicalOperation createLookupJoin(
taskConcurrency / partitionCount));

context.addDriverFactory(
buildContext.isInputDriver(),
false,
new PhysicalOperation(hashBuilderOperatorFactory, buildSource),
buildContext.getDriverInstanceCount());
buildContext);

JoinOperatorType joinType = JoinOperatorType.ofJoinNodeType(node.getType(), outputSingleMatch, waitForBuild);
operator = operatorFactories.join(
Expand Down Expand Up @@ -3241,10 +3238,9 @@ public PhysicalOperation visitSemiJoin(SemiJoinNode node, LocalExecutionPlanCont
blockTypeOperators);
SetSupplier setProvider = setBuilderOperatorFactory.getSetProvider();
context.addDriverFactory(
buildContext.isInputDriver(),
false,
new PhysicalOperation(setBuilderOperatorFactory, buildSource),
buildContext.getDriverInstanceCount());
buildContext);

// Source channels are always laid out first, followed by the boolean output symbol
Map<Symbol, Integer> outputMappings = ImmutableMap.<Symbol, Integer>builder()
Expand Down Expand Up @@ -3635,7 +3631,6 @@ private PhysicalOperation createLocalMerge(ExchangeNode node, LocalExecutionPlan
List<Symbol> expectedLayout = node.getInputs().get(0);
Function<Page, Page> pagePreprocessor = enforceLoadedLayoutProcessor(expectedLayout, source.getLayout());
context.addDriverFactory(
subContext.isInputDriver(),
false,
new PhysicalOperation(
new LocalExchangeSinkOperatorFactory(
Expand All @@ -3644,7 +3639,7 @@ private PhysicalOperation createLocalMerge(ExchangeNode node, LocalExecutionPlan
node.getId(),
pagePreprocessor),
source),
subContext.getDriverInstanceCount());
subContext);
// the main driver is not an input... the exchange sources are the input for the plan
context.setInputDriver(false);

Expand Down Expand Up @@ -3717,7 +3712,6 @@ else if (context.getDriverInstanceCount().isPresent()) {
Function<Page, Page> pagePreprocessor = enforceLoadedLayoutProcessor(expectedLayout, source.getLayout());

context.addDriverFactory(
subContext.isInputDriver(),
false,
new PhysicalOperation(
new LocalExchangeSinkOperatorFactory(
Expand All @@ -3726,7 +3720,7 @@ else if (context.getDriverInstanceCount().isPresent()) {
node.getId(),
pagePreprocessor),
source),
subContext.getDriverInstanceCount());
subContext);
}

// the main driver is not an input... the exchange sources are the input for the plan
Expand Down

0 comments on commit fb75256

Please sign in to comment.