Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable preferred write partitioning for FTE by default #14735

Merged
merged 6 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions core/trino-main/src/main/java/io/trino/SystemSessionProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ public final class SystemSessionProperties
public static final String FAULT_TOLERANT_EXECUTION_TASK_MEMORY_GROWTH_FACTOR = "fault_tolerant_execution_task_memory_growth_factor";
public static final String FAULT_TOLERANT_EXECUTION_TASK_MEMORY_ESTIMATION_QUANTILE = "fault_tolerant_execution_task_memory_estimation_quantile";
public static final String FAULT_TOLERANT_EXECUTION_PARTITION_COUNT = "fault_tolerant_execution_partition_count";
public static final String FAULT_TOLERANT_EXECUTION_PRESERVE_INPUT_PARTITIONS_IN_WRITE_STAGE = "fault_tolerant_execution_preserve_input_partitions_in_write_stage";
public static final String ADAPTIVE_PARTIAL_AGGREGATION_ENABLED = "adaptive_partial_aggregation_enabled";
public static final String ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS = "adaptive_partial_aggregation_min_rows";
public static final String ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD = "adaptive_partial_aggregation_unique_rows_ratio_threshold";
Expand All @@ -177,6 +176,7 @@ public final class SystemSessionProperties
public static final String FORCE_SPILLING_JOIN = "force_spilling_join";
public static final String FAULT_TOLERANT_EXECUTION_EVENT_DRIVEN_SCHEDULER_ENABLED = "fault_tolerant_execution_event_driven_scheduler_enabled";
public static final String FORCE_FIXED_DISTRIBUTION_FOR_PARTITIONED_OUTPUT_OPERATOR_ENABLED = "force_fixed_distribution_for_partitioned_output_operator_enabled";
public static final String FAULT_TOLERANT_EXECUTION_FORCE_PREFERRED_WRITE_PARTITIONING_ENABLED = "fault_tolerant_execution_force_preferred_write_partitioning_enabled";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer not to document it and drop it after some period. There's no real reason why would somebody want to disable it unless there's a correctness or performance bug.


private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -832,11 +832,6 @@ public SystemSessionProperties(
"Number of partitions for distributed joins and aggregations executed with fault tolerant execution enabled",
queryManagerConfig.getFaultTolerantExecutionPartitionCount(),
false),
booleanProperty(
FAULT_TOLERANT_EXECUTION_PRESERVE_INPUT_PARTITIONS_IN_WRITE_STAGE,
"Ensure single task reads single hash partitioned input partition for stages which write table data",
queryManagerConfig.getFaultTolerantPreserveInputPartitionsInWriteStage(),
false),
booleanProperty(
ADAPTIVE_PARTIAL_AGGREGATION_ENABLED,
"When enabled, partial aggregation might be adaptively turned off when it does not provide any performance gain",
Expand Down Expand Up @@ -877,6 +872,11 @@ public SystemSessionProperties(
FORCE_FIXED_DISTRIBUTION_FOR_PARTITIONED_OUTPUT_OPERATOR_ENABLED,
"Force partitioned output operator to be run with fixed distribution",
optimizerConfig.isForceFixedDistributionForPartitionedOutputOperatorEnabled(),
true),
booleanProperty(
FAULT_TOLERANT_EXECUTION_FORCE_PREFERRED_WRITE_PARTITIONING_ENABLED,
"Force preferred write partitioning for fault tolerant execution",
queryManagerConfig.isFaultTolerantExecutionForcePreferredWritePartitioningEnabled(),
true));
}

Expand Down Expand Up @@ -1521,11 +1521,6 @@ public static double getFaultTolerantExecutionTaskMemoryEstimationQuantile(Sessi
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_TASK_MEMORY_ESTIMATION_QUANTILE, Double.class);
}

public static boolean getFaultTolerantPreserveInputPartitionsInWriteStage(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_PRESERVE_INPUT_PARTITIONS_IN_WRITE_STAGE, Boolean.class);
}

public static int getFaultTolerantExecutionPartitionCount(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_PARTITION_COUNT, Integer.class);
Expand Down Expand Up @@ -1570,4 +1565,13 @@ public static boolean isForceFixedDistributionForPartitionedOutputOperatorEnable
{
return session.getSystemProperty(FORCE_FIXED_DISTRIBUTION_FOR_PARTITIONED_OUTPUT_OPERATOR_ENABLED, Boolean.class);
}

public static boolean isFaultTolerantExecutionForcePreferredWritePartitioningEnabled(Session session)
{
if (!isFaultTolerantExecutionEventDriverSchedulerEnabled(session)) {
// supported only in event driven scheduler
return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we throw instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully we will remove the legacy scheduler soon. I added this more as of a precaucion in case somebody needs to revert to the old scheduler what could result into skew issues.

}
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_FORCE_PREFERRED_WRITE_PARTITIONING_ENABLED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public class QueryManagerConfig
private int faultTolerantExecutionMaxTaskSplitCount = 256;
private DataSize faultTolerantExecutionTaskDescriptorStorageMaxMemory = DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.15));
private int faultTolerantExecutionPartitionCount = 50;
private boolean faultTolerantPreserveInputPartitionsInWriteStage = true;
private boolean faultTolerantExecutionEventDrivenSchedulerEnabled = true;
private boolean faultTolerantExecutionForcePreferredWritePartitioningEnabled = true;

@Min(1)
public int getScheduleSplitBatchSize()
Expand Down Expand Up @@ -620,28 +620,27 @@ public QueryManagerConfig setFaultTolerantExecutionPartitionCount(int faultToler
return this;
}

public boolean getFaultTolerantPreserveInputPartitionsInWriteStage()
public boolean isFaultTolerantExecutionEventDrivenSchedulerEnabled()
{
return faultTolerantPreserveInputPartitionsInWriteStage;
return faultTolerantExecutionEventDrivenSchedulerEnabled;
}

@Config("fault-tolerant-execution-preserve-input-partitions-in-write-stage")
@ConfigDescription("Ensure single task reads single hash partitioned input partition for stages which write table data")
losipiuk marked this conversation as resolved.
Show resolved Hide resolved
public QueryManagerConfig setFaultTolerantPreserveInputPartitionsInWriteStage(boolean faultTolerantPreserveInputPartitionsInWriteStage)
@Config("experimental.fault-tolerant-execution-event-driven-scheduler-enabled")
public QueryManagerConfig setFaultTolerantExecutionEventDrivenSchedulerEnabled(boolean faultTolerantExecutionEventDrivenSchedulerEnabled)
{
this.faultTolerantPreserveInputPartitionsInWriteStage = faultTolerantPreserveInputPartitionsInWriteStage;
this.faultTolerantExecutionEventDrivenSchedulerEnabled = faultTolerantExecutionEventDrivenSchedulerEnabled;
return this;
}

public boolean isFaultTolerantExecutionEventDrivenSchedulerEnabled()
public boolean isFaultTolerantExecutionForcePreferredWritePartitioningEnabled()
{
return faultTolerantExecutionEventDrivenSchedulerEnabled;
return faultTolerantExecutionForcePreferredWritePartitioningEnabled;
}

@Config("experimental.fault-tolerant-execution-event-driven-scheduler-enabled")
public QueryManagerConfig setFaultTolerantExecutionEventDrivenSchedulerEnabled(boolean faultTolerantExecutionEventDrivenSchedulerEnabled)
@Config("experimental.fault-tolerant-execution-force-preferred-write-partitioning-enabled")
public QueryManagerConfig setFaultTolerantExecutionForcePreferredWritePartitioningEnabled(boolean faultTolerantExecutionForcePreferredWritePartitioningEnabled)
{
this.faultTolerantExecutionEventDrivenSchedulerEnabled = faultTolerantExecutionEventDrivenSchedulerEnabled;
this.faultTolerantExecutionForcePreferredWritePartitioningEnabled = faultTolerantExecutionForcePreferredWritePartitioningEnabled;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.PlanVisitor;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.TableWriterNode;

import javax.inject.Inject;

Expand All @@ -48,10 +46,10 @@
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionMaxTaskSplitCount;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTargetTaskInputSize;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTargetTaskSplitCount;
import static io.trino.SystemSessionProperties.getFaultTolerantPreserveInputPartitionsInWriteStage;
import static io.trino.sql.planner.SystemPartitioningHandle.COORDINATOR_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
Expand Down Expand Up @@ -186,43 +184,19 @@ private SplitAssigner createSplitAssigner(
maxArbitraryDistributionTaskSplitCount);
}
if (partitioning.equals(FIXED_HASH_DISTRIBUTION) || partitioning.getCatalogHandle().isPresent() ||
(partitioning.getConnectorHandle() instanceof MergePartitioningHandle)) {
return new HashDistributionSplitAssigner(
(partitioning.getConnectorHandle() instanceof MergePartitioningHandle) ||
partitioning.equals(SCALED_WRITER_HASH_DISTRIBUTION)) {
return HashDistributionSplitAssigner.create(
partitioning.getCatalogHandle(),
partitionedSources,
replicatedSources,
getFaultTolerantExecutionTargetTaskInputSize(session).toBytes(),
outputDataSizeEstimates,
sourcePartitioningScheme,
getFaultTolerantPreserveInputPartitionsInWriteStage(session) && isWriteFragment(fragment));
outputDataSizeEstimates,
fragment,
getFaultTolerantExecutionTargetTaskInputSize(session).toBytes());
}

// other partitioning handles are not expected to be set as a fragment partitioning
throw new IllegalArgumentException("Unexpected partitioning: " + partitioning);
}

private static boolean isWriteFragment(PlanFragment fragment)
{
PlanVisitor<Boolean, Void> visitor = new PlanVisitor<>()
{
@Override
protected Boolean visitPlan(PlanNode node, Void context)
{
for (PlanNode child : node.getSources()) {
if (child.accept(this, context)) {
return true;
}
}
return false;
}

@Override
public Boolean visitTableWriter(TableWriterNode node, Void context)
{
return true;
}
};

return fragment.getRoot().accept(visitor, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.stream.IntStream;

import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION;
import static java.util.Objects.requireNonNull;

@NotThreadSafe
Expand All @@ -55,7 +56,7 @@ public FaultTolerantPartitioningScheme get(PartitioningHandle handle)

private FaultTolerantPartitioningScheme create(PartitioningHandle partitioningHandle)
{
if (partitioningHandle.equals(FIXED_HASH_DISTRIBUTION)) {
if (partitioningHandle.equals(FIXED_HASH_DISTRIBUTION) || partitioningHandle.equals(SCALED_WRITER_HASH_DISTRIBUTION)) {
return new FaultTolerantPartitioningScheme(
partitionCount,
Optional.of(IntStream.range(0, partitionCount).toArray()),
Expand Down
Loading