diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 7e3ab212668b..4766f6ca2b68 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -636,7 +636,6 @@ public LocalExecutionPlan plan( .collect(toImmutableList()); context.addDriverFactory( - context.isInputDriver(), true, new PhysicalOperation( outputOperatorFactory.createOutputOperator( @@ -646,7 +645,7 @@ public LocalExecutionPlan plan( pagePreprocessor, new PagesSerdeFactory(plannerContext.getBlockEncodingSerde(), isExchangeCompressionEnabled(session))), physicalOperation), - context.getDriverInstanceCount()); + context); // notify operator factories that planning has completed context.getDriverFactories().stream() @@ -697,8 +696,10 @@ private LocalExecutionPlanContext( this.nextPipelineId = nextPipelineId; } - public void addDriverFactory(boolean inputDriver, boolean outputDriver, PhysicalOperation physicalOperation, OptionalInt driverInstances) + public void addDriverFactory(boolean outputDriver, PhysicalOperation physicalOperation, LocalExecutionPlanContext context) { + boolean inputDriver = context.isInputDriver(); + OptionalInt driverInstances = context.getDriverInstanceCount(); List operatorFactoriesWithTypes = physicalOperation.getOperatorFactoriesWithTypes(); addLookupOuterDrivers(outputDriver, toOperatorFactories(operatorFactoriesWithTypes)); List operatorFactories; @@ -2663,10 +2664,9 @@ private PhysicalOperation createNestedLoopJoin(JoinNode node, Set outputMappings = ImmutableMap.builder(); @@ -2794,10 +2794,9 @@ private PagesSpatialIndexFactory createPagesSpatialIndexFactory( pagesIndexFactory); context.addDriverFactory( - buildContext.isInputDriver(), false, new PhysicalOperation(builderOperatorFactory, buildSource), - buildContext.getDriverInstanceCount()); + buildContext); return builderOperatorFactory.getPagesSpatialIndexFactory(); } @@ -2942,10 +2941,9 @@ private PhysicalOperation createLookupJoin( taskConcurrency / partitionCount)); context.addDriverFactory( - buildContext.isInputDriver(), false, new PhysicalOperation(hashBuilderOperatorFactory, buildSource), - buildContext.getDriverInstanceCount()); + buildContext); JoinOperatorType joinType = JoinOperatorType.ofJoinNodeType(node.getType(), outputSingleMatch, waitForBuild); operator = operatorFactories.spillingJoin( @@ -2995,10 +2993,9 @@ private PhysicalOperation createLookupJoin( taskConcurrency / partitionCount)); context.addDriverFactory( - buildContext.isInputDriver(), false, new PhysicalOperation(hashBuilderOperatorFactory, buildSource), - buildContext.getDriverInstanceCount()); + buildContext); JoinOperatorType joinType = JoinOperatorType.ofJoinNodeType(node.getType(), outputSingleMatch, waitForBuild); operator = operatorFactories.join( @@ -3241,10 +3238,9 @@ public PhysicalOperation visitSemiJoin(SemiJoinNode node, LocalExecutionPlanCont blockTypeOperators); SetSupplier setProvider = setBuilderOperatorFactory.getSetProvider(); context.addDriverFactory( - buildContext.isInputDriver(), false, new PhysicalOperation(setBuilderOperatorFactory, buildSource), - buildContext.getDriverInstanceCount()); + buildContext); // Source channels are always laid out first, followed by the boolean output symbol Map outputMappings = ImmutableMap.builder() @@ -3635,7 +3631,6 @@ private PhysicalOperation createLocalMerge(ExchangeNode node, LocalExecutionPlan List expectedLayout = node.getInputs().get(0); Function pagePreprocessor = enforceLoadedLayoutProcessor(expectedLayout, source.getLayout()); context.addDriverFactory( - subContext.isInputDriver(), false, new PhysicalOperation( new LocalExchangeSinkOperatorFactory( @@ -3644,7 +3639,7 @@ private PhysicalOperation createLocalMerge(ExchangeNode node, LocalExecutionPlan node.getId(), pagePreprocessor), source), - subContext.getDriverInstanceCount()); + subContext); // the main driver is not an input... the exchange sources are the input for the plan context.setInputDriver(false); @@ -3717,7 +3712,6 @@ else if (context.getDriverInstanceCount().isPresent()) { Function pagePreprocessor = enforceLoadedLayoutProcessor(expectedLayout, source.getLayout()); context.addDriverFactory( - subContext.isInputDriver(), false, new PhysicalOperation( new LocalExchangeSinkOperatorFactory( @@ -3726,7 +3720,7 @@ else if (context.getDriverInstanceCount().isPresent()) { node.getId(), pagePreprocessor), source), - subContext.getDriverInstanceCount()); + subContext); } // the main driver is not an input... the exchange sources are the input for the plan