diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java index 540af43658cf..0eb64475cfb0 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java @@ -58,8 +58,6 @@ import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITE_VALIDATION_FAILED; -import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_COLUMNS_KEY; -import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_FPP_KEY; import static io.trino.plugin.iceberg.IcebergSessionProperties.getCompressionCodec; import static io.trino.plugin.iceberg.IcebergSessionProperties.getOrcStringStatisticsLimit; import static io.trino.plugin.iceberg.IcebergSessionProperties.getOrcWriterMaxDictionaryMemory; @@ -72,12 +70,13 @@ import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterPageSize; import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterPageValueCount; import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcWriterValidate; -import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP; +import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY; +import static io.trino.plugin.iceberg.IcebergUtil.getOrcBloomFilterColumns; +import static io.trino.plugin.iceberg.IcebergUtil.getOrcBloomFilterFpp; import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; import static io.trino.plugin.iceberg.util.OrcTypeConverter.toOrcType; import static io.trino.plugin.iceberg.util.PrimitiveTypeMapBuilder.makeTypeMap; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; -import static java.lang.Double.parseDouble; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE; @@ -270,18 +269,18 @@ private IcebergFileWriter createOrcWriter( public static OrcWriterOptions withBloomFilterOptions(OrcWriterOptions orcWriterOptions, Map storageProperties) { - if (storageProperties.containsKey(ORC_BLOOM_FILTER_COLUMNS_KEY)) { + Optional orcBloomFilterColumns = getOrcBloomFilterColumns(storageProperties); + Optional orcBloomFilterFpp = getOrcBloomFilterFpp(storageProperties); + if (orcBloomFilterColumns.isPresent()) { try { - double fpp = storageProperties.containsKey(ORC_BLOOM_FILTER_FPP_KEY) - ? parseDouble(storageProperties.get(ORC_BLOOM_FILTER_FPP_KEY)) - : orcWriterOptions.getBloomFilterFpp(); + double fpp = orcBloomFilterFpp.map(Double::parseDouble).orElseGet(orcWriterOptions::getBloomFilterFpp); return OrcWriterOptions.builderFrom(orcWriterOptions) - .setBloomFilterColumns(ImmutableSet.copyOf(COLUMN_NAMES_SPLITTER.splitToList(storageProperties.get(ORC_BLOOM_FILTER_COLUMNS_KEY)))) + .setBloomFilterColumns(ImmutableSet.copyOf(COLUMN_NAMES_SPLITTER.splitToList(orcBloomFilterColumns.get()))) .setBloomFilterFpp(fpp) .build(); } catch (NumberFormatException e) { - throw new TrinoException(ICEBERG_INVALID_METADATA, format("Invalid value for %s property: %s", ORC_BLOOM_FILTER_FPP, storageProperties.get(ORC_BLOOM_FILTER_FPP_KEY))); + throw new TrinoException(ICEBERG_INVALID_METADATA, format("Invalid value for %s property: %s", ORC_BLOOM_FILTER_FPP_PROPERTY, orcBloomFilterFpp.get())); } } return orcWriterOptions; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index b7c450a0623a..8f45318eac41 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -314,9 +314,6 @@ public class IcebergMetadata private static final String UNKNOWN_SNAPSHOT_TOKEN = "UNKNOWN"; public static final Set UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(FILE_FORMAT_PROPERTY, FORMAT_VERSION_PROPERTY, PARTITIONING_PROPERTY, SORTED_BY_PROPERTY); - public static final String ORC_BLOOM_FILTER_COLUMNS_KEY = "orc.bloom.filter.columns"; - public static final String ORC_BLOOM_FILTER_FPP_KEY = "orc.bloom.filter.fpp"; - public static final String NUMBER_OF_DISTINCT_VALUES_NAME = "NUMBER_OF_DISTINCT_VALUES"; private static final FunctionName NUMBER_OF_DISTINCT_VALUES_FUNCTION = new FunctionName(IcebergThetaSketchForStats.NAME); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java index 9de78f2280e1..ae16225cc0af 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java @@ -43,8 +43,8 @@ public class IcebergTableProperties public static final String SORTED_BY_PROPERTY = "sorted_by"; public static final String LOCATION_PROPERTY = "location"; public static final String FORMAT_VERSION_PROPERTY = "format_version"; - public static final String ORC_BLOOM_FILTER_COLUMNS = "orc_bloom_filter_columns"; - public static final String ORC_BLOOM_FILTER_FPP = "orc_bloom_filter_fpp"; + public static final String ORC_BLOOM_FILTER_COLUMNS_PROPERTY = "orc_bloom_filter_columns"; + public static final String ORC_BLOOM_FILTER_FPP_PROPERTY = "orc_bloom_filter_fpp"; private final List> tableProperties; @@ -90,7 +90,7 @@ public IcebergTableProperties( IcebergTableProperties::validateFormatVersion, false)) .add(new PropertyMetadata<>( - ORC_BLOOM_FILTER_COLUMNS, + ORC_BLOOM_FILTER_COLUMNS_PROPERTY, "ORC Bloom filter index columns", new ArrayType(VARCHAR), List.class, @@ -102,7 +102,7 @@ public IcebergTableProperties( .collect(toImmutableList()), value -> value)) .add(doubleProperty( - ORC_BLOOM_FILTER_FPP, + ORC_BLOOM_FILTER_FPP_PROPERTY, "ORC Bloom filter false positive probability", orcWriterConfig.getDefaultBloomFilterFpp(), IcebergTableProperties::validateOrcBloomFilterFpp, @@ -154,13 +154,13 @@ private static void validateFormatVersion(int version) public static List getOrcBloomFilterColumns(Map tableProperties) { - List orcBloomFilterColumns = (List) tableProperties.get(ORC_BLOOM_FILTER_COLUMNS); + List orcBloomFilterColumns = (List) tableProperties.get(ORC_BLOOM_FILTER_COLUMNS_PROPERTY); return orcBloomFilterColumns == null ? ImmutableList.of() : ImmutableList.copyOf(orcBloomFilterColumns); } public static Double getOrcBloomFilterFpp(Map tableProperties) { - return (Double) tableProperties.get(ORC_BLOOM_FILTER_FPP); + return (Double) tableProperties.get(ORC_BLOOM_FILTER_FPP_PROPERTY); } private static void validateOrcBloomFilterFpp(double fpp) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index a9dd14485ab7..2990723d62fe 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -80,6 +80,7 @@ import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -103,17 +104,13 @@ import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnMetadata; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE; -import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_COLUMNS_KEY; -import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_FPP_KEY; import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.LOCATION_PROPERTY; -import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_COLUMNS; -import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP; +import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_COLUMNS_PROPERTY; +import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY; -import static io.trino.plugin.iceberg.IcebergTableProperties.getOrcBloomFilterColumns; -import static io.trino.plugin.iceberg.IcebergTableProperties.getOrcBloomFilterFpp; import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning; import static io.trino.plugin.iceberg.IcebergTableProperties.getSortOrder; import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields; @@ -159,6 +156,8 @@ import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED; import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; +import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS; +import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP; import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION; import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL; import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; @@ -175,6 +174,10 @@ public final class IcebergUtil public static final String METADATA_FOLDER_NAME = "metadata"; public static final String METADATA_FILE_EXTENSION = ".metadata.json"; public static final String TRINO_QUERY_ID_NAME = "trino_query_id"; + // For backward compatibility only. DO NOT USE. + private static final String BROKEN_ORC_BLOOM_FILTER_FPP_KEY = "orc.bloom.filter.fpp"; + // For backward compatibility only. DO NOT USE. + private static final String BROKEN_ORC_BLOOM_FILTER_COLUMNS_KEY = "orc.bloom.filter.columns"; private static final Pattern SIMPLE_NAME = Pattern.compile("[a-z][a-z0-9]*"); // Metadata file name examples // - 00001-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json @@ -242,18 +245,41 @@ public static Map getIcebergTableProperties(Table icebergTable) properties.put(FORMAT_VERSION_PROPERTY, formatVersion); // iceberg ORC format bloom filter properties - String orcBloomFilterColumns = icebergTable.properties().get(ORC_BLOOM_FILTER_COLUMNS_KEY); - if (orcBloomFilterColumns != null) { - properties.put(ORC_BLOOM_FILTER_COLUMNS, Splitter.on(',').trimResults().omitEmptyStrings().splitToList(orcBloomFilterColumns)); + Optional orcBloomFilterColumns = getOrcBloomFilterColumns(icebergTable.properties()); + if (orcBloomFilterColumns.isPresent()) { + properties.put(ORC_BLOOM_FILTER_COLUMNS_PROPERTY, Splitter.on(',').trimResults().omitEmptyStrings().splitToList(orcBloomFilterColumns.get())); } - String orcBloomFilterFpp = icebergTable.properties().get(ORC_BLOOM_FILTER_FPP_KEY); - if (orcBloomFilterFpp != null) { - properties.put(ORC_BLOOM_FILTER_FPP, Double.parseDouble(orcBloomFilterFpp)); + // iceberg ORC format bloom filter properties + Optional orcBloomFilterFpp = getOrcBloomFilterFpp(icebergTable.properties()); + if (orcBloomFilterFpp.isPresent()) { + properties.put(ORC_BLOOM_FILTER_FPP_PROPERTY, Double.parseDouble(orcBloomFilterFpp.get())); } return properties.buildOrThrow(); } + // Version 382-438 set incorrect table properties: https://github.com/trinodb/trino/commit/b89aac68c43e5392f23b8d6ba053bbeb6df85028#diff-2af3e19a6b656640a7d0bb73114ef224953a2efa04e569b1fe4da953b2cc6d15R418-R419 + // `orc.bloom.filter.columns` was set instead of `write.orc.bloom.filter.columns`, and `orc.bloom.filter.fpp` instead of `write.orc.bloom.filter.fpp` + // These methods maintain backward compatibility for existing table. + public static Optional getOrcBloomFilterColumns(Map properties) + { + Optional orcBloomFilterColumns = Stream.of( + properties.get(ORC_BLOOM_FILTER_COLUMNS), + properties.get(BROKEN_ORC_BLOOM_FILTER_COLUMNS_KEY)) + .filter(Objects::nonNull) + .findFirst(); + return orcBloomFilterColumns; + } + + public static Optional getOrcBloomFilterFpp(Map properties) + { + return Stream.of( + properties.get(ORC_BLOOM_FILTER_FPP), + properties.get(BROKEN_ORC_BLOOM_FILTER_FPP_KEY)) + .filter(Objects::nonNull) + .findFirst(); + } + public static List getColumns(Schema schema, TypeManager typeManager) { return schema.columns().stream() @@ -659,12 +685,12 @@ public static Map createTableProperties(ConnectorTableMetadata t propertiesBuilder.put(FORMAT_VERSION, Integer.toString(IcebergTableProperties.getFormatVersion(tableMetadata.getProperties()))); // iceberg ORC format bloom filter properties used by create table - List columns = getOrcBloomFilterColumns(tableMetadata.getProperties()); + List columns = IcebergTableProperties.getOrcBloomFilterColumns(tableMetadata.getProperties()); if (!columns.isEmpty()) { - checkFormatForProperty(fileFormat.toIceberg(), FileFormat.ORC, ORC_BLOOM_FILTER_COLUMNS); + checkFormatForProperty(fileFormat.toIceberg(), FileFormat.ORC, ORC_BLOOM_FILTER_COLUMNS_PROPERTY); validateOrcBloomFilterColumns(tableMetadata, columns); - propertiesBuilder.put(ORC_BLOOM_FILTER_COLUMNS_KEY, Joiner.on(",").join(columns)); - propertiesBuilder.put(ORC_BLOOM_FILTER_FPP_KEY, String.valueOf(getOrcBloomFilterFpp(tableMetadata.getProperties()))); + propertiesBuilder.put(ORC_BLOOM_FILTER_COLUMNS, Joiner.on(",").join(columns)); + propertiesBuilder.put(ORC_BLOOM_FILTER_FPP, String.valueOf(IcebergTableProperties.getOrcBloomFilterFpp(tableMetadata.getProperties()))); } if (tableMetadata.getComment().isPresent()) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 9849b9ecc08c..4fa4047efde6 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -32,6 +32,7 @@ import io.trino.plugin.hive.HiveCompressionCodec; import io.trino.plugin.hive.TestingHivePlugin; import io.trino.plugin.iceberg.fileio.ForwardingFileIo; +import io.trino.plugin.iceberg.fileio.ForwardingOutputFile; import io.trino.server.DynamicFilterService; import io.trino.spi.QueryId; import io.trino.spi.connector.ColumnHandle; @@ -152,6 +153,7 @@ import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; +import static org.apache.iceberg.TableMetadata.newTableMetadata; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.offset; @@ -4714,6 +4716,36 @@ protected void verifyIcebergTableProperties(MaterializedResult actual) assertEqualsIgnoreOrder(actual.getMaterializedRows(), expected.getMaterializedRows()); } + @Test + public void testGetIcebergTableWithLegacyOrcBloomFilterProperties() + throws Exception + { + String tableName = "test_get_table_with_legacy_orc_bloom_filter_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1); + + String tableLocation = getTableLocation(tableName); + String metadataLocation = getLatestMetadataLocation(fileSystem, tableLocation); + + TableMetadata tableMetadata = TableMetadataParser.read(new ForwardingFileIo(fileSystem), metadataLocation); + ImmutableMap newProperties = ImmutableMap.builder() + .putAll(tableMetadata.properties()) + .put("orc.bloom.filter.columns", "x,y") // legacy incorrect property + .put("orc.bloom.filter.fpp", "0.2") // legacy incorrect property + .buildOrThrow(); + TableMetadata newTableMetadata = newTableMetadata( + tableMetadata.schema(), + tableMetadata.spec(), + tableMetadata.sortOrder(), + tableMetadata.location(), + newProperties); + TableMetadataParser.overwrite(newTableMetadata, new ForwardingOutputFile(fileSystem, Location.of(metadataLocation))); + + MaterializedResult actualCreateTable = computeActual("SHOW CREATE TABLE " + tableName); + assertThat(actualCreateTable).isNotNull(); + assertThat(actualCreateTable.getMaterializedRows().getFirst().toString()) + .contains(ImmutableList.of("orc_bloom_filter_columns", "orc_bloom_filter_fpp")); + } + protected abstract boolean supportsIcebergFileStatistics(String typeName); @Test diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcWithBloomFilters.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcWithBloomFilters.java index 38d3652852ef..ddc5606e7361 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcWithBloomFilters.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcWithBloomFilters.java @@ -13,10 +13,17 @@ */ package io.trino.plugin.iceberg; +import com.google.common.collect.ImmutableList; import io.trino.testing.BaseOrcWithBloomFiltersTest; +import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; +import org.junit.jupiter.api.Test; +import static io.trino.testing.MaterializedResult.resultBuilder; +import static io.trino.testing.QueryAssertions.assertContains; +import static io.trino.testing.TestingNames.randomNameSuffix; import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; public class TestIcebergOrcWithBloomFilters extends BaseOrcWithBloomFiltersTest @@ -39,4 +46,26 @@ protected String getTableProperties(String bloomFilterColumnName, String bucketi bloomFilterColumnName, bucketingColumnName); } + + @Test + public void testBloomFilterPropertiesArePersistedDuringCreate() + { + String tableName = "test_metadata_write_properties_" + randomNameSuffix(); + assertQuerySucceeds("CREATE TABLE " + tableName + " (a bigint, b bigint, c bigint) WITH (" + + "format = 'orc'," + + "orc_bloom_filter_columns = array['a','b']," + + "orc_bloom_filter_fpp = 0.1)"); + + MaterializedResult actualProperties = computeActual("SELECT * FROM \"" + tableName + "$properties\""); + assertThat(actualProperties).isNotNull(); + MaterializedResult expectedProperties = resultBuilder(getSession()) + .row("write.orc.bloom.filter.columns", "a,b") + .row("write.orc.bloom.filter.fpp", "0.1").build(); + assertContains(actualProperties, expectedProperties); + + MaterializedResult actualCreateTable = computeActual("SHOW CREATE TABLE " + tableName); + assertThat(actualCreateTable).isNotNull(); + assertThat(actualCreateTable.getMaterializedRows().getFirst().toString()) + .contains(ImmutableList.of("orc_bloom_filter_columns", "orc_bloom_filter_fpp")); + } }