-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable preferred write partitioning for FTE by default #14735
Enable preferred write partitioning for FTE by default #14735
Conversation
Is it about local "preferred" partitioning in insert tasks? For me it seems that after #14140 we could just enable "preferred" partitioning within task by default and leave only global decision to stats. The improvement here doesn't seem special to FTE.
Few questions
|
In FTE we can at runtime decide to schedule multiple tasks to consume data from a single partition. Assigning a subset of data to each of the tasks.
It is about scheduling more tasks. |
core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java
Show resolved
Hide resolved
@@ -238,6 +247,13 @@ public void update(ListMultimap<Integer, Split> splits, boolean noMoreSplits) | |||
assigner.assign(planNodeId, splits, noMoreSplits).update(callback); | |||
if (noMoreSplits) { | |||
finishedSources.add(planNodeId); | |||
|
|||
Optional<List<Object>> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this one not caught by tests? It looks we are missing coverage. We should add ALTER TABLE ... EXECUTE OPTIMIZE
tests for Iceberg on FTE. Not even necessarily with injecting any failures.
Maybe it would just make sense to run TestIcebergParquetConnectorTest
with FTE.
Let me try to do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also enabled optimize tests in Hive
So you need partitioning in global exchange to setup spooling exchanges correctly? |
@@ -52,6 +55,10 @@ public boolean isEnabled(Session session) | |||
@Override | |||
public Result apply(TableExecuteNode node, Captures captures, Context context) | |||
{ | |||
if (getRetryPolicy(context.getSession()) == RetryPolicy.TASK && isFaultTolerantExecutionForcePreferredWritePartitioningEnabled(context.getSession())) { | |||
// fault tolerant execution mitigates skew |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe elaborate a bit on that. I don't think it's obvious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment
@@ -57,6 +60,11 @@ public boolean isEnabled(Session session) | |||
@Override | |||
public Result apply(TableWriterNode node, Captures captures, Context context) | |||
{ | |||
if (getRetryPolicy(context.getSession()) == RetryPolicy.TASK && isFaultTolerantExecutionForcePreferredWritePartitioningEnabled(context.getSession())) { | |||
// fault tolerant execution mitigates skew |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe elaborate a bit on that. I don't think it's obvious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment
8c27483
to
7728705
Compare
I believe that. Can you please augment the PR description with explanation why only FTE is being changed, and pipeline mode isn't (they have same limitations with this regard) |
{ | ||
if (!isFaultTolerantExecutionEventDriverSchedulerEnabled(session)) { | ||
// supported only in event driven scheduler | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we throw instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully we will remove the legacy scheduler soon. I added this more as of a precaucion in case somebody needs to revert to the old scheduler what could result into skew issues.
core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java
Show resolved
Hide resolved
if (remainingSourcesSizeInBytes <= targetPartitionSizeInBytes / 4 && canSplit.test(largestSource)) { | ||
long targetLargestSourceSizeInBytes = targetPartitionSizeInBytes - remainingSourcesSizeInBytes; | ||
splitBy = Optional.of(largestSource); | ||
subPartitionCount = toIntExact(largestSourceSizeInBytes / targetLargestSourceSizeInBytes) + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we put some limit on how many sub partitions can we generate - I do not see a strict reason - but I may be missing sth
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we don't set any arbitrary limit for arbitrary distributed tasks. Here in theory it should be similar (it is in theory an "arbitrary distribution" but just within only a single partition)
.orElseThrow(); | ||
long largestSourceSizeInBytes = sourceSizes.get(largestSource); | ||
long remainingSourcesSizeInBytes = partitionSizeInBytes - largestSourceSizeInBytes; | ||
if (remainingSourcesSizeInBytes <= targetPartitionSizeInBytes / 4 && canSplit.test(largestSource)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why 4?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically I wanted to limit the overhead of spitting, as the remaining sources have to be replicated when one of the sources is split. This probably sounds to arbitrary and will have to be revisited in the future. An alternative would be to simply give-up when there's more than a single source (for TableWrite only a single source is expected) and revisit it once we start working on skew mitigation for Join.
@Test | ||
public void testTableWritePreferredWritePartitioningSkewMitigation() | ||
{ | ||
// We use TEXTFILE in this test because is has a very consistent and predictable size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is -> it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually don't use text file in this test. A copy-paste mistake
public void testTableWritePreferredWritePartitioningSkewMitigation() | ||
{ | ||
// We use TEXTFILE in this test because is has a very consistent and predictable size | ||
@Language("SQL") String createTableSql = "" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use multiline literal and .format(....)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow! Java now has multi line!
...faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFaultTolerantExecutionTest.java
Outdated
Show resolved
Hide resolved
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.testng.Assert.assertEquals; | ||
|
||
public abstract class BaseFaultTolerantExecutionTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should have separate BaseFaultTolerantExecutionTest
or rather add stuff to BaseConnectorTest
.
I think we should start running BaseConnectorTest
based test classes both in pipelined and in FTE mode.
cc: @findepi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought this class might be useful for test cases that are specific for FaultTolerantExecution
.
...trino-main/src/test/java/io/trino/execution/scheduler/TestHashDistributionSplitAssigner.java
Outdated
Show resolved
Hide resolved
...trino-main/src/test/java/io/trino/execution/scheduler/TestHashDistributionSplitAssigner.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you dig through - code looks fine. But I would appreciate some readability improvements, method extraction e.al.
7728705
to
ed7a050
Compare
c185dc7
to
355f29b
Compare
Rebased and ready for review |
core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java
Show resolved
Hide resolved
long partitionSizeInBytes, | ||
long targetPartitionSizeInBytes, | ||
Set<PlanNodeId> partitionedSources, | ||
Map<PlanNodeId, OutputDataSizeEstimate> outputDataSizeEstimates, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we always have estimates. Will that break apart when we start doing speculative execution when estimates are not always there. Or will they still be available even then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good question.
Basically there are two possible scenarios:
- Some upstream tasks have finished. In this scenario we can project estimates by adjusting for the tasks that are still in process. This will usually be the case when starting a hash distributed stage that reads data from a source distributed stage (e.g.: TableScan)
- Non upstream tasks have finished. This scenario is more tricky. I guess it would generally indicate that the number of input tasks is small (e.g.: less than available slots in the cluster). In such scenario i guess we would have to resort to simply telling the HashDistributionSplitAssigner to create a fixed number of tasks with uniform partition assignment. It should in theory be good enough, as generally it would indicate that the input data set is not "large". However if it's endup being problematic we would probably need to improve the estimate reporting mechanism to allow incomplete tasks to report intermediate "estimates".
@Override | ||
protected Boolean visitPlan(PlanNode node, Void context) | ||
{ | ||
for (PlanNode child : node.getSources()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: does having more than one source mean false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is used to prevent partition merging (to avoid running into number of partitions limit for TableWrite). As long as there's a single TableWriter node (no matter where) it should be safe to assume that a fragment is a TableWrite fragment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - I just meant that any node which has a TableWriter as a child has just a single node. Or not necessarily?
Anyway - no changes requestes - just a curious question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be true within a single fragment. It is not necessarily true overall due to PushTableWriteThroughUnion
.
core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java
Show resolved
Hide resolved
@@ -323,9 +323,6 @@ public PlanNode visitRefreshMaterializedView(RefreshMaterializedViewNode node, R | |||
@Override | |||
public PlanNode visitTableWriter(TableWriterNode node, RewriteContext<FragmentProperties> context) | |||
{ | |||
if (node.getPartitioningScheme().isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why are we dropping this part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Fragmenter
has to choose the distribution of a stage. Since the partitioning handle of an exchange is getting changed from FIXED_HASH_DISTRIBUTION
to SCALED_WRITER_HASH_DISTRIBUTION
(and similar for connector specific partitioning) the Fragmenter
becomes confused, as it sees different partitioning handles in TableWriterNode
and ExchangeNode
.
I honestly don't know what's the best way to approach this. From one side I don't feel like taking into account the handle in TableWriterNode
is necessary at this point, as distribution should generally be determined by the ExchangeNode
s / TableScan
. If an incorrect exchange is inserted - plan is broken anyway.
However it still might be a good "sanity" check. But to keep it functional we would have to overwrite the partitioning in TableWriteNode
what would mean modifying the TableWriterNode
at the AddExchanges
step what also seems a little weird, especially since the AddLocalExchange
may not expect it.
@sopel39 @gaurav8297 Thoughts?
else { | ||
PartitioningHandle partitioningHandle = partitioningScheme.get().getPartitioning().getHandle(); | ||
verify(!(partitioningHandle.getConnectorHandle() instanceof SystemPartitioningHandle)); | ||
verify( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we sure that this will never be the case?
comment where it is enforced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically what these lengthy checks are saying is that a TableWriteNode
should either have no partitioning requirement, or if there's a requirement it must be a hash based partitioning. And hash based partitioning is either possible through a system handle (FIXED_HASH_DISTRIBUTION
) or through a connector specific partitioning, that is assumed to be hash based.
&& writerTarget.supportsReportingWrittenBytes(plannerContext.getMetadata(), session) | ||
&& writerTarget.supportsMultipleWritersPerPartition(plannerContext.getMetadata(), session) | ||
// do not insert an exchange if partitioning is compatible | ||
&& !newSource.getProperties().isCompatibleTablePartitioningWith(partitioningScheme.get().getPartitioning(), false, plannerContext.getMetadata(), session)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like isCompatibleTablePartitioningWith
check does not change anything as you this check anyway in line 678. And only do anything if partitioning is not compatible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If not checking it here partitioning can get re-assigned and then the check at 678
won't pass
@@ -314,6 +321,150 @@ PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) | |||
.run(); | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a test with missin OutputDataSizeEstimate
? Or is there and I missed it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The missing estimates are covered by testMissingEstimates
. Or do you think we need a test specific to splitting?
if (taskDescriptorSplitIds.contains(splitId) && splittableSources.contains(source)) { | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a source is "splittable" a split is expected to appear only in one of the sub partitions. If the source is non splittable it is expected to be "broadcasted" and be present in every sub partition.
partitioningHandle.getCatalogHandle(), | ||
partitioningHandle.getTransactionHandle(), | ||
partitioningHandle.getConnectorHandle(), | ||
true))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we always do that? Doesn't it depend on nature of connector partitioning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are checking for && writerTarget.supportsMultipleWritersPerPartition(plannerContext.getMetadata(), session)
355f29b
to
e52cf11
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline. Idea to set scale writers parititonign handle in TableWriterNode
@@ -323,9 +323,6 @@ public PlanNode visitRefreshMaterializedView(RefreshMaterializedViewNode node, R | |||
@Override | |||
public PlanNode visitTableWriter(TableWriterNode node, RewriteContext<FragmentProperties> context) | |||
{ | |||
if (node.getPartitioningScheme().isPresent()) { | |||
context.get().setDistribution(node.getPartitioningScheme().get().getPartitioning().getHandle(), metadata, session); | |||
} | |||
return context.defaultRewrite(node, context.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not:
context.defaultRewrite(node, context.get()); // RemoteExchange will set scaled distribution
if (node.getPartitioningScheme().isPresent()) {
context.get().setDistribution(...)
}
in setDistribution
you can check that the distribution you try to set is same as pre-set one and accepts scaling of writers.
partitioningCacheMap.computeIfAbsent(partitioningHandle, handle -> nodePartitioningManager.getNodePartitioningMap(queryStateMachine.getSession(), handle)); | ||
partitioningCacheMap.computeIfAbsent(partitioningHandle, handle -> nodePartitioningManager.getNodePartitioningMap( | ||
queryStateMachine.getSession(), | ||
// TODO: support hash distributed writer scaling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add Github issue (if there isn't one)
Partition should never be merged for the write stage given a risk of running into the maximum number of partitions limit enforced by writers
Extract helper classes to avoid methods with too many arguments
Implement skew mitigation to avoid running into low write concurrency when the number of output partitions is small
e52cf11
to
1f3d8ab
Compare
I tried implementing it, but it doesn't seem to be trivial. It is unclear where to overwrite the handle. Most likely an additional rule would be required (that runs after the rule that enables preferred write partitioning). Also It feels like I need to think more whether it is ideal to overwrite the partitioning handle. Strictly speaking it somehow seems a little counter-intuitive to see I think if we should consider going forward with the current approach, and then think if it can be refactored holistically for all cases (including scaled writes to an unpartitioned table). |
@@ -176,6 +176,7 @@ | |||
public static final String FORCE_SPILLING_JOIN = "force_spilling_join"; | |||
public static final String FAULT_TOLERANT_EXECUTION_EVENT_DRIVEN_SCHEDULER_ENABLED = "fault_tolerant_execution_event_driven_scheduler_enabled"; | |||
public static final String FORCE_FIXED_DISTRIBUTION_FOR_PARTITIONED_OUTPUT_OPERATOR_ENABLED = "force_fixed_distribution_for_partitioned_output_operator_enabled"; | |||
public static final String FAULT_TOLERANT_EXECUTION_FORCE_PREFERRED_WRITE_PARTITIONING_ENABLED = "fault_tolerant_execution_force_preferred_write_partitioning_enabled"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer not to document it and drop it after some period. There's no real reason why would somebody want to disable it unless there's a correctness or performance bug.
looks good to me. Let's iterate from here |
Description
Implement skew mitigation to avoid running into low write concurrency when the number of output partitions is small
Non-technical explanation
Some connectors (such as Hive) have limit on the number of partitions that can be written to in a single task. The default strategy for writes is to have all tasks (workers) to write to all partitions while distributing the data between each workers in a random (round-robin) fashion. This way it is not possible to write to more than 100 partitions by default.
To allow the engine insert into more than 100 partitions in a single query the
preferred_write_partitioning
feature was introduced #2358 as an opt-in feature and a cost-based optimization was added later to enable the feature based on statistics #6920.However cost based rule is not always reliable as it may miss-estimate the number of partitions being written:
preferred_write_partitioning
feature may not kick in and a query may fail with running into the limitThis PR intends to provide a reliable solution to the aforementioned problem.
The idea is to always enable
preffered_write_partitioning
when fault tolerant execution is enabled, and when it turns out that the number of partitions is low the fault tolerant execution can mitigate the skew by assigning more tasks to process a skewed partition. Currently such optimization only possible in fault tolerant execution. Implementing similar technique in pipelines execution is more challenging as today there's one-to-one relation between a partition and a consumer task.This PR only addresses the distributed part of the problem (allocates multiple tasks to process a single partition). Yet it is possible that parallelism within a task could be lower than desired. The local parallelism issue is being addressed by a different PR: #14140
Release notes
( ) This is not user-visible or docs only and no release notes are required.
(x) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: