diff --git a/docs/src/main/sphinx/connector/iceberg.rst b/docs/src/main/sphinx/connector/iceberg.rst index c04f9932253f..df326bbe1e0e 100644 --- a/docs/src/main/sphinx/connector/iceberg.rst +++ b/docs/src/main/sphinx/connector/iceberg.rst @@ -250,6 +250,7 @@ The following table properties can be updated after a table is created: * ``format`` * ``format_version`` +* ``partitioning`` For example, to update a table from v1 of the Iceberg specification to v2: @@ -257,6 +258,14 @@ For example, to update a table from v1 of the Iceberg specification to v2: ALTER TABLE table_name SET PROPERTIES format_version = 2; +Or to set the column ``my_new_partition_column`` as a partition column on a table: + +.. code-block:: sql + + ALTER TABLE table_name SET PROPERTIES partitioning = ARRAY[, 'my_new_partition_column']; + +The current values of a table's properties can be shown using :doc:`SHOW CREATE TABLE `. + .. _iceberg-type-mapping: Type mapping diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 0afc8cf41af8..7df414fdb223 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -103,8 +103,11 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.UpdateProperties; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Term; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -207,6 +210,7 @@ public class IcebergMetadata private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 1; private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2; private static final String RETENTION_THRESHOLD = "retention_threshold"; + public static final Set UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(FILE_FORMAT_PROPERTY, FORMAT_VERSION_PROPERTY, PARTITIONING_PROPERTY); private final TypeManager typeManager; private final JsonCodec commitTaskCodec; @@ -1161,29 +1165,42 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta IcebergTableHandle table = (IcebergTableHandle) tableHandle; BaseTable icebergTable = (BaseTable) catalog.loadTable(session, table.getSchemaTableName()); + Set unsupportedProperties = Sets.difference(properties.keySet(), UPDATABLE_TABLE_PROPERTIES); + if (!unsupportedProperties.isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "The following properties cannot be updated: " + String.join(", ", unsupportedProperties)); + } + transaction = icebergTable.newTransaction(); UpdateProperties updateProperties = transaction.updateProperties(); - for (Map.Entry> propertyEntry : properties.entrySet()) { - String trinoPropertyName = propertyEntry.getKey(); - Optional propertyValue = propertyEntry.getValue(); + if (properties.containsKey(FILE_FORMAT_PROPERTY)) { + IcebergFileFormat fileFormat = (IcebergFileFormat) properties.get(FILE_FORMAT_PROPERTY) + .orElseThrow(() -> new IllegalArgumentException("The format property cannot be empty")); + updateProperties.defaultFormat(fileFormat.toIceberg()); + } - switch (trinoPropertyName) { - case FILE_FORMAT_PROPERTY: - updateProperties.defaultFormat(((IcebergFileFormat) propertyValue.orElseThrow()).toIceberg()); - break; - case FORMAT_VERSION_PROPERTY: - // UpdateProperties#commit will trigger any necessary metadata updates required for the new spec version - updateProperty(updateProperties, FORMAT_VERSION, propertyValue, formatVersion -> Integer.toString((int) formatVersion)); - break; - default: - // TODO: Support updating partitioning https://github.com/trinodb/trino/issues/12174 - throw new TrinoException(NOT_SUPPORTED, "Updating the " + trinoPropertyName + " property is not supported"); - } + if (properties.containsKey(FORMAT_VERSION_PROPERTY)) { + // UpdateProperties#commit will trigger any necessary metadata updates required for the new spec version + int formatVersion = (int) properties.get(FORMAT_VERSION_PROPERTY) + .orElseThrow(() -> new IllegalArgumentException("The format_version property cannot be empty")); + updateProperties.set(FORMAT_VERSION, Integer.toString((int) formatVersion)); } try { updateProperties.commit(); + } + catch (RuntimeException e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to set new property values", e); + } + + if (properties.containsKey(PARTITIONING_PROPERTY)) { + @SuppressWarnings("unchecked") + List partitionColumns = (List) properties.get(PARTITIONING_PROPERTY) + .orElseThrow(() -> new IllegalArgumentException("The partitioning property cannot be empty")); + updatePartitioning(icebergTable, transaction, partitionColumns); + } + + try { transaction.commitTransaction(); } catch (RuntimeException e) { @@ -1191,14 +1208,35 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta } } - private static void updateProperty(UpdateProperties updateProperties, String icebergPropertyName, Optional value, Function toIcebergString) + private static void updatePartitioning(Table icebergTable, Transaction transaction, List partitionColumns) { - if (value.isPresent()) { - updateProperties.set(icebergPropertyName, toIcebergString.apply(value.get())); + UpdatePartitionSpec updatePartitionSpec = transaction.updateSpec(); + Set existingPartitionFields = icebergTable.spec().fields().stream().collect(toImmutableSet()); + Schema schema = icebergTable.schema(); + if (partitionColumns.isEmpty()) { + existingPartitionFields.stream() + .map(partitionField -> toIcebergTerm(schema, partitionField)) + .forEach(updatePartitionSpec::removeField); } else { - updateProperties.remove(icebergPropertyName); + Set partitionFields = ImmutableSet.copyOf(parsePartitionFields(schema, partitionColumns).fields()); + Sets.difference(existingPartitionFields, partitionFields).forEach(partitionField -> updatePartitionSpec.removeField(partitionField.name())); + Sets.difference(partitionFields, existingPartitionFields).stream() + .map(partitionField -> toIcebergTerm(schema, partitionField)) + .forEach(updatePartitionSpec::addField); + } + + try { + updatePartitionSpec.commit(); } + catch (RuntimeException e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to set new partitioning value", e); + } + } + + private static Term toIcebergTerm(Schema schema, PartitionField partitionField) + { + return Expressions.transform(schema.findColumnName(partitionField.sourceId()), partitionField.transform()); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPartitionEvolution.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPartitionEvolution.java new file mode 100644 index 000000000000..bbd13484e2c8 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPartitionEvolution.java @@ -0,0 +1,157 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.MaterializedRow; +import io.trino.testing.QueryRunner; +import io.trino.tpch.TpchTable; +import org.testng.annotations.Test; + +import java.util.List; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static java.lang.Math.toIntExact; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; + +public class TestIcebergPartitionEvolution + extends AbstractTestQueryFramework +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.builder() + .setInitialTables(ImmutableList.of(TpchTable.NATION)) + .build(); + } + + @Test + public void testRemovePartitioning() + { + String tableName = "test_remove_partition_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey', 'truncate(name, 1)']) AS SELECT * FROM nation WHERE nationkey < 10", 10); + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY[]"); + assertUpdate("INSERT INTO " + tableName + " SELECT * FROM nation WHERE nationkey >= 10", 15); + + List files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows(); + List unpartitionedFiles = files.stream() + .filter(file -> !((String) file.getField(0)).contains("regionkey=")) + .collect(toImmutableList()); + + List partitionedFiles = files.stream() + .filter(file -> ((String) file.getField(0)).contains("regionkey=")) + .collect(toImmutableList()); + + int expectedFileCount = computeActual("SELECT DISTINCT regionkey, substring(name, 1, 1) FROM nation WHERE nationkey < 10").getRowCount(); + assertThat(partitionedFiles).hasSize(expectedFileCount); + assertEquals(partitionedFiles.stream().mapToLong(row -> (long) row.getField(1)).sum(), 10L); + + assertThat(unpartitionedFiles).hasSize(1); + assertEquals((long) unpartitionedFiles.get(0).getField(1), 15); + + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); + // Most partitions have one record each. regionkey=2, trunc_name=I has two records, and 15 records are unpartitioned + assertQuery("SELECT record_count, count(*) FROM \"" + tableName + "$partitions\" GROUP BY record_count", "VALUES (1, 8), (2, 1), (15, 1)"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testAddPartitionColumn() + { + String tableName = "test_add_partition_column_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM nation WHERE nationkey < 10", 10); + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['regionkey', 'truncate(name, 1)']"); + assertUpdate("INSERT INTO " + tableName + " SELECT * FROM nation WHERE nationkey >= 10", 15); + assertThat((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()).contains("partitioning = ARRAY['regionkey','truncate(name, 1)']"); + + List files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows(); + List initialFiles = files.stream() + .filter(file -> !((String) file.getField(0)).contains("name_trunc")) + .collect(toImmutableList()); + + List partitionedFiles = files.stream() + .filter(file -> ((String) file.getField(0)).contains("name_trunc")) + .collect(toImmutableList()); + + int expectedInitialFiles = toIntExact((long) computeActual("SELECT count(distinct regionkey) FROM nation WHERE nationkey < 10").getOnlyValue()); + assertThat(initialFiles).hasSize(expectedInitialFiles); + assertEquals(initialFiles.stream().mapToLong(row -> (long) row.getField(1)).sum(), 10L); + + int expectedFinalFileCount = computeActual("SELECT DISTINCT regionkey, substring(name, 1, 1) FROM nation WHERE nationkey >= 10").getRowCount(); + assertThat(partitionedFiles).hasSize(expectedFinalFileCount); + assertEquals(partitionedFiles.stream().mapToLong(row -> (long) row.getField(1)).sum(), 15L); + + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); + assertUpdate("DROP TABLE " + tableName); + + assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['truncate(name, 1)']) AS SELECT * FROM nation WHERE nationkey < 10", 10); + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['truncate(name, 1)', 'regionkey']"); + assertUpdate("INSERT INTO " + tableName + " SELECT * FROM nation WHERE nationkey >= 10", 15); + assertThat((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()).contains("partitioning = ARRAY['truncate(name, 1)','regionkey']"); + + files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows(); + initialFiles = files.stream() + .filter(file -> !((String) file.getField(0)).contains("regionkey=")) + .collect(toImmutableList()); + + partitionedFiles = files.stream() + .filter(file -> ((String) file.getField(0)).contains("regionkey=")) + .collect(toImmutableList()); + + expectedInitialFiles = toIntExact((long) computeActual("SELECT DISTINCT substring(name, 1, 1) FROM nation WHERE nationkey < 10").getRowCount()); + assertThat(initialFiles).hasSize(expectedInitialFiles); + assertEquals(initialFiles.stream().mapToLong(row -> (long) row.getField(1)).sum(), 10L); + + expectedFinalFileCount = computeActual("SELECT DISTINCT regionkey, substring(name, 1, 1) FROM nation WHERE nationkey >= 10").getRowCount(); + assertThat(partitionedFiles).hasSize(expectedFinalFileCount); + assertEquals(partitionedFiles.stream().mapToLong(row -> (long) row.getField(1)).sum(), 15L); + + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testChangePartitionTransform() + { + String tableName = "test_change_partition_transform_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (ts, a) WITH (partitioning = ARRAY['year(ts)']) " + + "AS VALUES (TIMESTAMP '2021-01-01 01:01:01.111111', 1), (TIMESTAMP '2022-02-02 02:02:02.222222', 2), (TIMESTAMP '2023-03-03 03:03:03.333333', 3)", 3); + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['month(ts)']"); + assertUpdate("INSERT INTO " + tableName + " VALUES (TIMESTAMP '2024-04-04 04:04:04.444444', 4), (TIMESTAMP '2025-05-05 05:05:05.555555', 5)", 2); + assertThat((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()).contains("partitioning = ARRAY['month(ts)']"); + + List files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows(); + List yearPartitionedFiles = files.stream() + .filter(file -> { + String filePath = ((String) file.getField(0)); + return filePath.contains("ts_year") && !filePath.contains("ts_month"); + }) + .collect(toImmutableList()); + + List monthPartitionedFiles = files.stream() + .filter(file -> { + String filePath = ((String) file.getField(0)); + return !filePath.contains("ts_year") && filePath.contains("ts_month"); + }) + .collect(toImmutableList()); + + assertThat(yearPartitionedFiles).hasSize(3); + assertThat(monthPartitionedFiles).hasSize(2); + assertUpdate("DROP TABLE " + tableName); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index e908ca055e7f..7462226d876f 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -34,11 +34,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.BaseTable; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetWriter; @@ -53,6 +55,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.util.List; import java.util.UUID; import static com.google.common.io.MoreFiles.deleteRecursively; @@ -66,6 +69,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public class TestIcebergV2 extends AbstractTestQueryFramework @@ -193,7 +197,7 @@ public void testDowngradingV2TableToV1Fails() assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25); assertEquals(loadTable(tableName).operations().current().formatVersion(), 2); assertThatThrownBy(() -> query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 1")) - .hasMessage("Failed to commit new table properties") + .hasMessage("Failed to set new property values") .getRootCause() .hasMessage("Cannot downgrade v2 table to v1"); } @@ -208,6 +212,50 @@ public void testUpgradingToInvalidVersionFails() .hasMessage("Unable to set catalog 'iceberg' table property 'format_version' to [42]: format_version must be between 1 and 2"); } + @Test + public void testUpdatingAllTableProperties() + { + String tableName = "test_updating_all_table_properties_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 1, format = 'ORC') AS SELECT * FROM tpch.tiny.nation", 25); + BaseTable table = loadTable(tableName); + assertEquals(table.operations().current().formatVersion(), 1); + assertTrue(table.properties().get(TableProperties.DEFAULT_FILE_FORMAT).equalsIgnoreCase("ORC")); + assertTrue(table.spec().isUnpartitioned()); + + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 2, partitioning = ARRAY['regionkey'], format = 'PARQUET'"); + table = loadTable(tableName); + assertEquals(table.operations().current().formatVersion(), 2); + assertTrue(table.properties().get(TableProperties.DEFAULT_FILE_FORMAT).equalsIgnoreCase("PARQUET")); + assertTrue(table.spec().isPartitioned()); + List partitionFields = table.spec().fields(); + assertThat(partitionFields).hasSize(1); + assertEquals(partitionFields.get(0).name(), "regionkey"); + assertTrue(partitionFields.get(0).transform().isIdentity()); + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); + } + + @Test + public void testUnsettingAllTableProperties() + { + String tableName = "test_unsetting_all_table_properties_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 1, format = 'PARQUET', partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", 25); + BaseTable table = loadTable(tableName); + assertEquals(table.operations().current().formatVersion(), 1); + assertTrue(table.properties().get(TableProperties.DEFAULT_FILE_FORMAT).equalsIgnoreCase("PARQUET")); + assertTrue(table.spec().isPartitioned()); + List partitionFields = table.spec().fields(); + assertThat(partitionFields).hasSize(1); + assertEquals(partitionFields.get(0).name(), "regionkey"); + assertTrue(partitionFields.get(0).transform().isIdentity()); + + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format_version = DEFAULT, format = DEFAULT, partitioning = DEFAULT"); + table = loadTable(tableName); + assertEquals(table.operations().current().formatVersion(), 2); + assertTrue(table.properties().get(TableProperties.DEFAULT_FILE_FORMAT).equalsIgnoreCase("ORC")); + assertTrue(table.spec().isUnpartitioned()); + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); + } + private void writeEqualityDeleteToNationTable(Table icebergTable) throws Exception {