Skip to content

Commit

Permalink
Introduce TableScanNode#useConnectorNodePartitioning
Browse files Browse the repository at this point in the history
This commit introduces TableScanNode#useConnectorNodePartitioning
which will be used by optimizer rule to specify if node partitioning
should be used for table scan.
  • Loading branch information
sopel39 committed Mar 22, 2021
1 parent eef0153 commit b2f1f69
Show file tree
Hide file tree
Showing 25 changed files with 116 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ private RelationPlan createAnalyzePlan(Analysis analysis, Analyze analyzeStateme
idAllocator.getNextId(),
new AggregationNode(
idAllocator.getNextId(),
TableScanNode.newInstance(idAllocator.getNextId(), targetTable, tableScanOutputs.build(), symbolToColumnHandle.build(), false),
TableScanNode.newInstance(idAllocator.getNextId(), targetTable, tableScanOutputs.build(), symbolToColumnHandle.build(), false, Optional.empty()),
statisticAggregations.getAggregations(),
singleGroupingSet(groupingSymbols),
ImmutableList.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public PlanNode visitSample(SampleNode node, RewriteContext<Void> context)
@Override
public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
{
return new TableScanNode(idAllocator.getNextId(), node.getTable(), node.getOutputSymbols(), node.getAssignments(), node.getEnforcedConstraint(), node.isUpdateTarget());
return new TableScanNode(idAllocator.getNextId(), node.getTable(), node.getOutputSymbols(), node.getAssignments(), node.getEnforcedConstraint(), node.isUpdateTarget(), node.getUseConnectorNodePartitioning());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,10 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
node.getOutputSymbols(),
node.getAssignments(),
node.getEnforcedConstraint(),
node.isUpdateTarget());
node.isUpdateTarget(),
// plan was already fragmented with scan node's partitioning
// and new partitioning is compatible with previous one
node.getUseConnectorNodePartitioning());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ protected RelationPlan visitTable(Table node, Void context)

List<Symbol> outputSymbols = outputSymbolsBuilder.build();
boolean updateTarget = analysis.isUpdateTarget(node);
PlanNode root = TableScanNode.newInstance(idAllocator.getNextId(), handle, outputSymbols, columns.build(), updateTarget);
PlanNode root = TableScanNode.newInstance(idAllocator.getNextId(), handle, outputSymbols, columns.build(), updateTarget, Optional.empty());

plan = new RelationPlan(root, scope, outputSymbols, outerContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ public Result apply(TableScanNode scanNode, Captures captures, Context context)
scanNode.getOutputSymbols(),
newAssignments,
TupleDomain.all(),
scanNode.isUpdateTarget()));
scanNode.isUpdateTarget(),
Optional.empty()));
}

ImmutableMap.Builder<Symbol, ColumnHandle> newAssignmentsBuilder = ImmutableMap.<Symbol, ColumnHandle>builder()
Expand Down Expand Up @@ -189,7 +190,8 @@ public Result apply(TableScanNode scanNode, Captures captures, Context context)
newOutputSymbols,
newAssignmentsBuilder.build(),
TupleDomain.all(),
scanNode.isUpdateTarget());
scanNode.isUpdateTarget(),
Optional.empty());

FilterNode filterNode = new FilterNode(
context.getIdAllocator().getNextId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public static Optional<PlanNode> pruneColumns(Metadata metadata, TypeProvider ty
newOutputs,
newAssignments,
enforcedConstraint,
node.isUpdateTarget()));
node.isUpdateTarget(),
node.getUseConnectorNodePartitioning()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ public static Optional<PlanNode> pushAggregationIntoTableScan(
result.getHandle(),
newScanOutputs.build(),
scanAssignments,
tableScan.isUpdateTarget()),
tableScan.isUpdateTarget(),
// table scan partitioning might have changed with new table handle
Optional.empty()),
assignmentBuilder.build()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ public Result apply(JoinNode joinNode, Captures captures, Context context)
ImmutableList.copyOf(assignments.keySet()),
assignments,
newEnforcedConstraint,
false),
false,
Optional.empty()),
Assignments.identity(joinNode.getOutputSymbols())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.TableScanNode;

import java.util.Optional;

import static io.trino.SystemSessionProperties.isAllowPushdownIntoConnectors;
import static io.trino.matching.Capture.newCapture;
import static io.trino.sql.planner.plan.Patterns.limit;
Expand Down Expand Up @@ -70,7 +72,9 @@ public Rule.Result apply(LimitNode limit, Captures captures, Rule.Context contex
tableScan.getOutputSymbols(),
tableScan.getAssignments(),
tableScan.getEnforcedConstraint(),
tableScan.isUpdateTarget());
tableScan.isUpdateTarget(),
// table scan partitioning might have changed with new table handle
Optional.empty());

if (!result.isLimitGuaranteed()) {
node = new LimitNode(limit.getId(), node, limit.getCount(), limit.isPartial());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ public static Optional<PlanNode> pushFilterIntoTableScan(
node.getOutputSymbols(),
node.getAssignments(),
computeEnforced(newDomain, remainingFilter),
node.isUpdateTarget());
node.isUpdateTarget(),
Optional.empty());

Expression resultingPredicate = createResultingPredicate(
metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ public Result apply(ProjectNode project, Captures captures, Context context)
result.get().getHandle(),
newScanOutputs,
newScanAssignments,
tableScan.isUpdateTarget()),
tableScan.isUpdateTarget(),
Optional.empty()),
newProjectionAssignments.build()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.trino.sql.planner.plan.SampleNode.Type;
import io.trino.sql.planner.plan.TableScanNode;

import java.util.Optional;

import static io.trino.SystemSessionProperties.isAllowPushdownIntoConnectors;
import static io.trino.matching.Capture.newCapture;
import static io.trino.sql.planner.plan.Patterns.Sample.sampleType;
Expand Down Expand Up @@ -70,7 +72,9 @@ public Rule.Result apply(SampleNode sample, Captures captures, Rule.Context cont
tableScan.getOutputSymbols(),
tableScan.getAssignments(),
tableScan.getEnforcedConstraint(),
tableScan.isUpdateTarget())))
tableScan.isUpdateTarget(),
// table scan partitioning might have changed with new table handle
Optional.empty())))
.orElseGet(Result::empty);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.SystemSessionProperties.isAllowPushdownIntoConnectors;
Expand Down Expand Up @@ -86,7 +87,9 @@ public Result apply(TopNNode topNNode, Captures captures, Context context)
result.getHandle(),
tableScan.getOutputSymbols(),
tableScan.getAssignments(),
tableScan.isUpdateTarget());
tableScan.isUpdateTarget(),
// table scan partitioning might have changed with new table handle
Optional.empty());

if (!result.isTopNGuaranteed()) {
node = topNNode.replaceChildren(ImmutableList.of(node));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ private PlanNode rewriteModifyTableScan(PlanNode node, TableHandle handle)
scan.getOutputSymbols(),
scan.getAssignments(),
scan.getEnforcedConstraint(),
scan.isUpdateTarget());
scan.isUpdateTarget(),
// partitioning should not change with write table handle
scan.getUseConnectorNodePartitioning());
}

if (node instanceof FilterNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public PlanAndMappings visitTableScan(TableScanNode node, UnaliasContext context
});

return new PlanAndMappings(
new TableScanNode(node.getId(), node.getTable(), newOutputs, newAssignments, node.getEnforcedConstraint(), node.isUpdateTarget()),
new TableScanNode(node.getId(), node.getTable(), newOutputs, newAssignments, node.getEnforcedConstraint(), node.isUpdateTarget(), node.getUseConnectorNodePartitioning()),
mapping);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.metadata.TableHandle;
Expand All @@ -26,6 +27,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -42,6 +44,7 @@ public class TableScanNode

private final TupleDomain<ColumnHandle> enforcedConstraint;
private final boolean updateTarget;
private final Optional<Boolean> useConnectorNodePartitioning;

// We need this factory method to disambiguate with the constructor used for deserializing
// from a json object. The deserializer sets some fields which are never transported
Expand All @@ -51,9 +54,10 @@ public static TableScanNode newInstance(
TableHandle table,
List<Symbol> outputs,
Map<Symbol, ColumnHandle> assignments,
boolean updateTarget)
boolean updateTarget,
Optional<Boolean> useConnectorNodePartitioning)
{
return new TableScanNode(id, table, outputs, assignments, TupleDomain.all(), updateTarget);
return new TableScanNode(id, table, outputs, assignments, TupleDomain.all(), updateTarget, useConnectorNodePartitioning);
}

/*
Expand All @@ -67,7 +71,8 @@ public TableScanNode(
@JsonProperty("table") TableHandle table,
@JsonProperty("outputSymbols") List<Symbol> outputs,
@JsonProperty("assignments") Map<Symbol, ColumnHandle> assignments,
@JsonProperty("updateTarget") boolean updateTarget)
@JsonProperty("updateTarget") boolean updateTarget,
@JsonProperty("useConnectorNodePartitioning") Optional<Boolean> useConnectorNodePartitioning)
{
super(id);
this.table = requireNonNull(table, "table is null");
Expand All @@ -76,6 +81,7 @@ public TableScanNode(
checkArgument(assignments.keySet().containsAll(outputs), "assignments does not cover all of outputs");
this.enforcedConstraint = null;
this.updateTarget = updateTarget;
this.useConnectorNodePartitioning = requireNonNull(useConnectorNodePartitioning, "useConnectorNodePartitioning is null");
}

public TableScanNode(
Expand All @@ -84,7 +90,8 @@ public TableScanNode(
List<Symbol> outputs,
Map<Symbol, ColumnHandle> assignments,
TupleDomain<ColumnHandle> enforcedConstraint,
boolean updateTarget)
boolean updateTarget,
Optional<Boolean> useConnectorNodePartitioning)
{
super(id);
this.table = requireNonNull(table, "table is null");
Expand All @@ -93,6 +100,7 @@ public TableScanNode(
checkArgument(assignments.keySet().containsAll(outputs), "assignments does not cover all of outputs");
this.enforcedConstraint = requireNonNull(enforcedConstraint, "enforcedConstraint is null");
this.updateTarget = updateTarget;
this.useConnectorNodePartitioning = requireNonNull(useConnectorNodePartitioning, "useTableNodePartitioning is null");
}

@JsonProperty("table")
Expand Down Expand Up @@ -135,6 +143,18 @@ public boolean isUpdateTarget()
return updateTarget;
}

@JsonProperty("useConnectorNodePartitioning")
public Optional<Boolean> getUseConnectorNodePartitioning()
{
return useConnectorNodePartitioning;
}

public boolean isUseConnectorNodePartitioning()
{
return useConnectorNodePartitioning
.orElseThrow(() -> new VerifyException("useConnectorNodePartitioning is not present"));
}

@Override
public List<PlanNode> getSources()
{
Expand Down Expand Up @@ -165,4 +185,16 @@ public PlanNode replaceChildren(List<PlanNode> newChildren)
checkArgument(newChildren.isEmpty(), "newChildren is not empty");
return this;
}

public TableScanNode withUseConnectorNodePartitioning(boolean useConnectorNodePartitioning)
{
return new TableScanNode(
getId(),
table,
outputSymbols,
assignments,
enforcedConstraint,
updateTarget,
Optional.of(useConnectorNodePartitioning));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,8 @@ private TableScanNode tableScan(String id, String... symbols)
symbolsList,
assignments.build(),
TupleDomain.all(),
false);
false,
Optional.empty());
}

private PlanNode project(String id, PlanNode source, String symbol, Expression expression)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public MockRemoteTask createTableScanTask(TaskId taskId, InternalNode newNode, L
TEST_TABLE_HANDLE,
ImmutableList.of(symbol),
ImmutableMap.of(symbol, new TestingColumnHandle("column")),
false),
false,
Optional.empty()),
ImmutableMap.of(symbol, VARCHAR),
SOURCE_DISTRIBUTION,
ImmutableList.of(sourceId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ private TaskTestUtils() {}
TEST_TABLE_HANDLE,
ImmutableList.of(SYMBOL),
ImmutableMap.of(SYMBOL, new TestingColumnHandle("column", 0, BIGINT)),
false),
false,
Optional.empty()),
ImmutableMap.of(SYMBOL, VARCHAR),
SOURCE_DISTRIBUTION,
ImmutableList.of(TABLE_SCAN_NODE_ID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ private static PlanFragment createBroadcastJoinPlanFragment(String name, PlanFra
TEST_TABLE_HANDLE,
ImmutableList.of(symbol),
ImmutableMap.of(symbol, new TestingColumnHandle("column")),
false);
false,
Optional.empty());

RemoteSourceNode remote = new RemoteSourceNode(new PlanNodeId("build_id"), buildFragment.getId(), ImmutableList.of(), Optional.empty(), REPLICATE);
PlanNode join = new JoinNode(
Expand Down Expand Up @@ -236,7 +237,8 @@ private static PlanFragment createTableScanPlanFragment(String name)
TEST_TABLE_HANDLE,
ImmutableList.of(symbol),
ImmutableMap.of(symbol, new TestingColumnHandle("column")),
false);
false,
Optional.empty());

return createFragment(planNode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@ private static StageExecutionPlan createPlan(ConnectorSplitSource splitSource)
TEST_TABLE_HANDLE,
ImmutableList.of(symbol),
ImmutableMap.of(symbol, new TestingColumnHandle("column")),
false);
false,
Optional.empty());
FilterNode filterNode = new FilterNode(
new PlanNodeId("filter_node_id"),
tableScan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,8 @@ private static PlanFragment createPlan(DynamicFilterId dynamicFilterId, Partitio
TEST_TABLE_HANDLE,
ImmutableList.of(symbol),
ImmutableMap.of(symbol, new TestingMetadata.TestingColumnHandle("column")),
false);
false,
Optional.empty());
FilterNode filterNode = new FilterNode(
new PlanNodeId("filter_node_id"),
tableScan,
Expand Down
Loading

0 comments on commit b2f1f69

Please sign in to comment.