Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error message for partitioning by nested field in Iceberg #15142

Merged
merged 1 commit into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
import static org.apache.iceberg.types.TypeUtil.indexParents;
import static org.apache.iceberg.util.SnapshotUtil.schemaFor;

public class IcebergMetadata
Expand Down Expand Up @@ -737,6 +738,7 @@ private Optional<ConnectorTableLayout> getWriteLayout(Schema tableSchema, Partit
return Optional.empty();
}

validateNotPartitionedByNestedField(tableSchema, partitionSpec);
Map<Integer, IcebergColumnHandle> columnById = getColumns(tableSchema, typeManager).stream()
.collect(toImmutableMap(IcebergColumnHandle::getId, identity()));

Expand Down Expand Up @@ -764,6 +766,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

validateNotModifyingOldSnapshot(table, icebergTable);
validateNotPartitionedByNestedField(icebergTable.schema(), icebergTable.spec());

beginTransaction(icebergTable);

Expand Down Expand Up @@ -1063,6 +1066,7 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

validateNotModifyingOldSnapshot(table, icebergTable);
validateNotPartitionedByNestedField(icebergTable.schema(), icebergTable.spec());

int tableFormatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
if (tableFormatVersion > OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION) {
Expand Down Expand Up @@ -1469,7 +1473,9 @@ private static void updatePartitioning(Table icebergTable, Transaction transacti
.forEach(updatePartitionSpec::removeField);
}
else {
Set<PartitionField> partitionFields = ImmutableSet.copyOf(parsePartitionFields(schema, partitionColumns).fields());
PartitionSpec partitionSpec = parsePartitionFields(schema, partitionColumns);
validateNotPartitionedByNestedField(schema, partitionSpec);
Set<PartitionField> partitionFields = ImmutableSet.copyOf(partitionSpec.fields());
difference(existingPartitionFields, partitionFields).stream()
.map(PartitionField::name)
.forEach(updatePartitionSpec::removeField);
Expand Down Expand Up @@ -1684,6 +1690,7 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable

Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
validateNotModifyingOldSnapshot(table, icebergTable);
validateNotPartitionedByNestedField(icebergTable.schema(), icebergTable.spec());

beginTransaction(icebergTable);
return table.withRetryMode(retryMode);
Expand All @@ -1709,6 +1716,7 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable

Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
validateNotModifyingOldSnapshot(table, icebergTable);
validateNotPartitionedByNestedField(icebergTable.schema(), icebergTable.spec());

beginTransaction(icebergTable);
return table.withRetryMode(retryMode)
Expand Down Expand Up @@ -1780,6 +1788,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT

Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
validateNotModifyingOldSnapshot(table, icebergTable);
validateNotPartitionedByNestedField(icebergTable.schema(), icebergTable.spec());

beginTransaction(icebergTable);

Expand Down Expand Up @@ -1810,6 +1819,16 @@ private static void validateNotModifyingOldSnapshot(IcebergTableHandle table, Ta
}
}

public static void validateNotPartitionedByNestedField(Schema schema, PartitionSpec partitionSpec)
{
Map<Integer, Integer> indexParents = indexParents(schema.asStruct());
for (PartitionField field : partitionSpec.fields()) {
if (indexParents.containsKey(field.sourceId())) {
throw new TrinoException(NOT_SUPPORTED, "Partitioning by nested field is unsupported: " + field.name());
}
}
}

private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection<Slice> fragments, boolean runUpdateValidations)
{
Table icebergTable = transaction.table();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,20 @@ public void testCreatePartitionedTableWithNestedTypes()
dropTable("test_partitioned_table_nested_type");
}

@Test
public void testCreatePartitionedTableWithNestedField()
{
assertQueryFails(
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
"CREATE TABLE test_partitioned_table_nested_field(parent ROW(child VARCHAR)) WITH (partitioning = ARRAY['\"parent.child\"'])",
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
"\\QPartitioning by nested field is unsupported: parent.child");
assertQueryFails(
"CREATE TABLE test_partitioned_table_nested_field(grandparent ROW(parent ROW(child VARCHAR))) WITH (partitioning = ARRAY['\"grandparent.parent.child\"'])",
"\\QPartitioning by nested field is unsupported: grandparent.parent.child");
assertQueryFails(
"CREATE TABLE test_partitioned_table_nested_field(grandparent ROW(parent ROW(child VARCHAR))) WITH (partitioning = ARRAY['\"grandparent.parent\"'])",
"\\QUnable to parse partitioning value: Cannot partition by non-primitive source field: struct<3: child: optional string>");
}

@Test
public void testCreatePartitionedTableAs()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,15 @@ public void testChangePartitionTransform()
assertThat(monthPartitionedFiles).hasSize(2);
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testUnsupportedNestedFieldPartition()
{
String tableName = "test_unsupported_nested_field_partition_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + "(parent ROW(child VARCHAR))");
assertQueryFails(
"ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['\"parent.child\"']",
"Partitioning by nested field is unsupported: parent.child");
assertUpdate("DROP TABLE " + tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,36 @@ public void testTrinoReadsSparkPartitionedTable(StorageFormat storageFormat, int
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testPartitionedByNestedFiled()
{
String baseTableName = "test_trino_nested_field_partition_" + randomNameSuffix();
String trinoTableName = trinoTableName(baseTableName);
String sparkTableName = sparkTableName(baseTableName);

onSpark().executeQuery(format("" +
"CREATE TABLE %s (" +
" id INT," +
" parent STRUCT<nested:STRING>)" +
" USING ICEBERG" +
" PARTITIONED BY (parent.nested)" +
" TBLPROPERTIES ('format-version'=2)",
sparkTableName));

assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (2, ROW('b'))"))
.hasMessageContaining("Partitioning by nested field is unsupported: parent.nested");
assertQueryFailure(() -> onTrino().executeQuery("UPDATE " + trinoTableName + " SET id = 2"))
.hasMessageContaining("Partitioning by nested field is unsupported: parent.nested");
assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM " + trinoTableName))
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
.hasMessageContaining("Partitioning by nested field is unsupported: parent.nested");
assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO " + trinoTableName + " t USING " + trinoTableName + " s ON (t.id = s.id) WHEN MATCHED THEN UPDATE SET id = 2"))
.hasMessageContaining("Partitioning by nested field is unsupported: parent.nested");
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE " + trinoTableName + " EXECUTE OPTIMIZE"))
.hasMessageContaining("Partitioning by nested field is unsupported: parent.nested");

onSpark().executeQuery("DROP TABLE " + sparkTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion")
public void testTrinoReadingCompositeSparkData(StorageFormat storageFormat, int specVersion)
{
Expand Down