Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle partition schema evolution in partitions metadata #12416

Merged
merged 2 commits into from
May 25, 2022

Conversation

homar
Copy link
Member

@homar homar commented May 16, 2022

Description

Provides information about partitioning based on the set of all columns which were used in any spec.

Related issues, pull requests, and links

Fixes: #12323

Documentation

( ) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.

Release notes

( ) No release notes entries required.
( ) Release notes entries required with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

@@ -199,8 +206,9 @@ private Map<StructLikeWrapper, IcebergStatistics> getStatisticsByPartition(Table
.acceptDataFile(dataFile, fileScanTask.spec());
}

return partitions.entrySet().stream()
ImmutableMap<StructLikeWrapper, IcebergStatistics> collect = partitions.entrySet().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop variable assignment.

@@ -140,6 +142,11 @@ public ConnectorTableMetadata getTableMetadata()
return connectorTableMetadata;
}

private List<PartitionField> getAllPartitionFields(Table icebergTable)
{
return icebergTable.specs().values().stream().flatMap(x -> x.fields().stream()).collect(toUnmodifiableList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to perform deduplication here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we do

@findinpath
Copy link
Contributor

Can you pls add a test inspired by https://blog.starburst.io/trino-on-ice-ii-in-place-table-evolution-and-cloud-compatibility-with-iceberg which uses transforms https://trino.io/docs/current/connector/iceberg.html#partitioned-tables (e.g. : from month(ts) to day(ts)).

Also adding a partition field, dropping it and later adding it again would be welcome to see whether the deduplication of the partition fields is needed.

@findinpath
Copy link
Contributor

This PR could build much easier tests upon the functionality exposed by the PR #12259

@alexjo2144

@@ -140,6 +142,11 @@ public ConnectorTableMetadata getTableMetadata()
return connectorTableMetadata;
}

private List<PartitionField> getAllPartitionFields(Table icebergTable)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private List<PartitionField> getAllPartitionFields(Table icebergTable)
private static List<PartitionField> getAllPartitionFields(Table icebergTable)

@@ -140,6 +142,11 @@ public ConnectorTableMetadata getTableMetadata()
return connectorTableMetadata;
}

private List<PartitionField> getAllPartitionFields(Table icebergTable)
{
return icebergTable.specs().values().stream().flatMap(x -> x.fields().stream()).collect(toUnmodifiableList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we do

@@ -140,6 +142,11 @@ public ConnectorTableMetadata getTableMetadata()
return connectorTableMetadata;
}

private List<PartitionField> getAllPartitionFields(Table icebergTable)
{
return icebergTable.specs().values().stream().flatMap(x -> x.fields().stream()).collect(toUnmodifiableList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toUnmodifiableList() -> toImmutableList()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry !

@@ -180,7 +187,7 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
.useSnapshot(snapshotId.get())
.includeColumnStats();
// TODO make the cursor lazy
return buildRecordCursor(getStatisticsByPartition(tableScan), icebergTable.spec().fields());
return buildRecordCursor(getStatisticsByPartition(tableScan), getAllPartitionFields(icebergTable));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a class level variable with these already?

Suggested change
return buildRecordCursor(getStatisticsByPartition(tableScan), getAllPartitionFields(icebergTable));
return buildRecordCursor(getStatisticsByPartition(tableScan), partitionFields);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not, but I can make it

@homar homar force-pushed the homar/fix_iceberg_partitions_metadata branch 3 times, most recently from c0f61a5 to e3854c2 Compare May 18, 2022 07:24
@findepi
Copy link
Member

findepi commented May 19, 2022

@alexjo2144 @findinpath PTAL

{
List<RowType.Field> partitionFields = fields.stream()
.map(field -> RowType.field(
field.name(),
toTrinoType(field.transform().getResultType(schema.findType(field.sourceId())), typeManager)))
.collect(toImmutableList());
List<Integer> ids = fields.stream()
.map(PartitionField::fieldId)
.collect(toImmutableList());
if (partitionFields.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to the current commit:

if (partitionFields.isEmpty()) {
            return Optional.empty();
}

this can be replaced with a check

if (fields.isEmpty()) {
            return Optional.empty();
}

at the beginning of the method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add it as a separate commit


onTrino().executeQuery(format("CREATE TABLE %s (old_partition_key INT, new_partition_key INT, value date) WITH (PARTITIONING = array['old_partition_key'])", trinoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES (1, 10, date '2022-04-10'), (2, 20, date '2022-05-11'), (3, 30, date '2022-06-12'), (2, 20, date '2022-06-13')", trinoTableName));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please check the partitioning before doing changes on the partition fields.

.column(1);
Set<String> partitions = partitioning.stream().map(String::valueOf).collect(toUnmodifiableSet());
Assertions.assertThat(partitions.size()).isEqualTo(3);
Assertions.assertThat(partitions).containsAll(ImmutableList.of(
Copy link
Contributor

@findinpath findinpath May 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the old_partition_key shown here?

The current snapshot of the table has only new_partition_key as partition key.

Checking the partitions metadata table with Iceberg Spark implementation shows also only new_partition_key.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline, I will add more query to spark to show that behaviour is the same

private static class IcebergPartitionColumn
{
private final RowType rowType;
private final List<Integer> ids;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fieldIds?

io.trino.spi.type.Type trinoType = partitionColumnType.rowType.getFields().get(i).getType();
Object value = null;
for (int j = 0; j < partitionStruct.structType.fields().size(); j++) {
if (partitionStruct.structType.fields().get(j).fieldId() == partitionColumnType.ids.get(i)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a Map from fieldId -> Type rather than doing the double iteration on the lists?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing something but I failed to create such a map.
The reason for this inner loop is not to find type(it is found above it) but to assign data from partitionStruct to a correct partition field. The problem is partitionStruct comes from reading the files using fileScanTasks and here I am trying to match that with partitioning columns that come from reading the table spec.
My assumption is that none of this place contains all information this is why I need to match them.
It'd be great if I'm wrong about that, please point it out.
Tough before I started changing this it was done this way, maybe that was wrong and I should have tried to change it - I doubt this.

Copy link
Member Author

@homar homar May 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I may have figured out how to improve this tough

"{old_partition_key=2, new_partition_key=null}",
"{old_partition_key=3, new_partition_key=null}"));

onSpark().executeQuery(format("ALTER TABLE %s DROP PARTITION FIELD old_partition_key", sparkTableName));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an insert along with each of these alter table sections so we can see the partition using this field show up?

@homar homar force-pushed the homar/fix_iceberg_partitions_metadata branch from e3854c2 to a8b1e3f Compare May 20, 2022 15:45
.filter(partitionField -> existingColumnsIds.contains(partitionField.sourceId()))
.collect(toImmutableList());

Set<Integer> alreadyExistingFieldIds = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (optional): extract the logic for filtering duplicates to a different method. (just to improve the readability)

private final List<Types.NestedField> nonPartitionPrimitiveColumns;
private final Optional<RowType> partitionColumnType;
private final List<NestedField> nonPartitionPrimitiveColumns;
private final Optional<IcebergPartitionColumn> partitionColumnType;
private final List<io.trino.spi.type.Type> partitionColumnTypes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field partitionColumnTypes can be dropped now.
It is used only once in a code branch which is dependent to partitionColumnType

{
this.structLikeWrapper = structLikeWrapper;
Map<Integer, Integer> fieldIdToIndex = new HashMap<>();
List<NestedField> fields = structType.fields();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.fieldIdToIndex = fields.stream().collect(Collectors.toMap(NestedField::fieldId, Function.identity()));

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect. I need mapping from fieldId -> its index, your code gives fieldId -> NestedField, I could probably do it with some zipWithIndex method but I think standard way is more readible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, i didn't pay enough attention here. Thanks for the explanation.

@homar homar force-pushed the homar/fix_iceberg_partitions_metadata branch from a8b1e3f to 30f0d72 Compare May 23, 2022 10:08
Copy link
Member

@alexjo2144 alexjo2144 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple nits but looks good to me

List<Types.NestedField> columns = icebergTable.schema().columns();
List<PartitionField> partitionFields = icebergTable.spec().fields();
List<NestedField> columns = icebergTable.schema().columns();
partitionFields = getAllPartitionFields(icebergTable);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
partitionFields = getAllPartitionFields(icebergTable);
this.partitionFields = getAllPartitionFields(icebergTable);

.values().stream()
.flatMap(partitionSpec -> partitionSpec.fields().stream())
// skip columns that were dropped
.filter(partitionField -> existingColumnsIds.contains(partitionField.sourceId()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not possible right now, because of apache/iceberg#4563 right?

Copy link
Member Author

@homar homar May 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but not only, it is also to avoid name conflicts with columns that were renamed

.filter(partitionField -> existingColumnsIds.contains(partitionField.sourceId()))
.collect(toImmutableList());

return filterOutDuplicates(visiblePartitionFields);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Stream#distinct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't rely on PartitionField implementation of hashCode and equals as they take transformation into account and I really care about Id's. I could maybe provide my own comparator or something but I think this is cleaner now

public StructLikeWrapperWithStructType(StructLikeWrapper structLikeWrapper, Types.StructType structType)
{
this.structLikeWrapper = structLikeWrapper;
Map<Integer, Integer> fieldIdToIndex = new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ImmutableMap.Builder

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure but that's more code

@@ -292,4 +333,70 @@ private static Block getColumnMetricBlock(RowType columnMetricType, Object min,
rowBlockBuilder.closeEntry();
return columnMetricType.getObject(rowBlockBuilder, 0);
}

private static class StructLikeWrapperWithStructType
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets rename this, now that the field is fieldIdToIndex instead of the structType itself

@homar homar force-pushed the homar/fix_iceberg_partitions_metadata branch from 30f0d72 to 4d5e849 Compare May 25, 2022 07:57
@homar homar force-pushed the homar/fix_iceberg_partitions_metadata branch from 4d5e849 to 9d31fc7 Compare May 25, 2022 10:14
@findepi findepi merged commit db099cf into trinodb:master May 25, 2022
@findepi
Copy link
Member

findepi commented May 25, 2022

Merged, thanks!
Thank you @findinpath @alexjo2144 for your review

@github-actions github-actions bot added this to the 382 milestone May 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

Iceberg $partitions metadata table only uses the current Spec
4 participants