-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for preferred insert or create table layouts #2358
Conversation
fefa7f0
to
69a4f64
Compare
this is ready to go |
18c9274
to
7ef8a04
Compare
@@ -2002,7 +2002,16 @@ private static Domain buildColumnDomain(ColumnHandle column, List<HivePartition> | |||
|
|||
Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(table, typeManager); | |||
if (!hiveBucketHandle.isPresent()) { | |||
return Optional.empty(); | |||
// return preferred layout which is partitioned by partition columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apply partitioning by partition columns also when the table is bucketed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When table is bucketed then data partitioning on partition columns should be required, but this is outside of scope of this PR.
For example,
- currently: if you have 10 buckets per partition, then we partition data by bucket columns. This means only 10 workers will write data for all partitions
- better: if you partition data by bucket columns AND partition columns, then still 10 workers will write data per partition. However, set of workers per partition changes therefore improving parallelism.
This adds |
Some more context on engine vs connector approach. While connector specific toggle could be some stop-gap here, I don't think it's a good approach. This PR opens a way for CBO to make decision on using preferred insert layout. For example there could be very basic CBO rule that chooses proffered partitioning if number of groups if greater than some constant. I think such rule would work good enough in practice. This PR is step toward it. I don't like too many manual toggles The SPI needs to be changed regardless of approach as connectors need to be able to return |
7ef8a04
to
3edf486
Compare
if (getTaskWriterCount(session) > 1 && !node.getPartitioningScheme().isPresent()) { | ||
requiredProperties = fixedParallelism(); | ||
preferredProperties = fixedParallelism(); | ||
// TODO: add support for arbitrary partitioning in local exchanges |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dain mentioned there was some PR to add support for arbitrary partitioning in local exchanges.
@@ -58,7 +58,7 @@ public ConnectorNewTableLayout getLayout() | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand what the commit message is referring to. Whats "evenly partitioning"?
|
||
import static java.util.Objects.requireNonNull; | ||
|
||
public class ConnectorNewTableLayout | ||
{ | ||
private final ConnectorPartitioningHandle partitioning; | ||
private final Optional<ConnectorPartitioningHandle> partitioning; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is strange. Why would partioningColumns
be missing when partitioningColumns
is present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is strange. Why would partitioning be missing when partitioningColumns is present?
If you just care that data is partitioned on columns, but don't care how exactly, then you don't need to specify partitioning
if (writeTableLayout.get().getLayout().getPartitioning().isPresent()) { | ||
partitioningHandle = writeTableLayout.get().getPartitioning(); | ||
} | ||
else { | ||
// empty connector partitioning handle means evenly partitioning on partitioning columns | ||
partitioningHandle = FIXED_HASH_DISTRIBUTION; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the fact we're looking at getLayout().getPartitioning()
to decide whether to call getPartitioning()
very confusing, and probably a sign of an API design issue.
It'd be cleaner to make getPartitioning()
for NewTableLayout
return optional and just do:
PartitioningHandle partitioningHandle = writeTableLayout.get()
.getPartitioning()
.orElse(FIXED_HASH_DISTRIBUTION)
requiredProperties = fixedParallelism(); | ||
preferredProperties = fixedParallelism(); | ||
// TODO: add support for arbitrary partitioning in local exchanges | ||
boolean hasFixedHashDistribution = node.getPartitioningScheme().isPresent() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe...
boolean hasFixedHashDistribution = node.getPartitioningScheme()
.map(scheme -> scheme.getPartitioning().getHandle())
.filter(isEqual(FIXED_HASH_DISTRIBUTION))
.isPresent();
or
boolean hasFixedHashDistribution = node.getPartitioningScheme()
.filter(scheme -> scheme.getPartitioning().getHandle().equals(FIXED_HASH_DISTRIBUTION))
.isPresent();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can move inside the if (getTaskWriterCount(session) > 1) {
block
3edf486
to
3c65d37
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ac
|
||
import static java.util.Objects.requireNonNull; | ||
|
||
public class ConnectorNewTableLayout | ||
{ | ||
private final ConnectorPartitioningHandle partitioning; | ||
private final Optional<ConnectorPartitioningHandle> partitioning; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is strange. Why would partitioning be missing when partitioningColumns is present?
If you just care that data is partitioned on columns, but don't care how exactly, then you don't need to specify partitioning
3c65d37
to
0c629af
Compare
0c629af
to
835ec62
Compare
No description provided.