Skip to content

Commit

Permalink
Fix table props for Iceberg with orc_bloom_filter
Browse files Browse the repository at this point in the history
  • Loading branch information
oneonestar authored and wendigo committed Feb 29, 2024
1 parent bc97ae8 commit 76fe740
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITE_VALIDATION_FAILED;
import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_COLUMNS_KEY;
import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_FPP_KEY;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getCompressionCodec;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getOrcStringStatisticsLimit;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getOrcWriterMaxDictionaryMemory;
Expand All @@ -72,12 +70,13 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterPageSize;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterPageValueCount;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcWriterValidate;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY;
import static io.trino.plugin.iceberg.IcebergUtil.getOrcBloomFilterColumns;
import static io.trino.plugin.iceberg.IcebergUtil.getOrcBloomFilterFpp;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.plugin.iceberg.util.OrcTypeConverter.toOrcType;
import static io.trino.plugin.iceberg.util.PrimitiveTypeMapBuilder.makeTypeMap;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.Double.parseDouble;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE;
Expand Down Expand Up @@ -270,18 +269,18 @@ private IcebergFileWriter createOrcWriter(

public static OrcWriterOptions withBloomFilterOptions(OrcWriterOptions orcWriterOptions, Map<String, String> storageProperties)
{
if (storageProperties.containsKey(ORC_BLOOM_FILTER_COLUMNS_KEY)) {
Optional<String> orcBloomFilterColumns = getOrcBloomFilterColumns(storageProperties);
Optional<String> orcBloomFilterFpp = getOrcBloomFilterFpp(storageProperties);
if (orcBloomFilterColumns.isPresent()) {
try {
double fpp = storageProperties.containsKey(ORC_BLOOM_FILTER_FPP_KEY)
? parseDouble(storageProperties.get(ORC_BLOOM_FILTER_FPP_KEY))
: orcWriterOptions.getBloomFilterFpp();
double fpp = orcBloomFilterFpp.map(Double::parseDouble).orElseGet(orcWriterOptions::getBloomFilterFpp);
return OrcWriterOptions.builderFrom(orcWriterOptions)
.setBloomFilterColumns(ImmutableSet.copyOf(COLUMN_NAMES_SPLITTER.splitToList(storageProperties.get(ORC_BLOOM_FILTER_COLUMNS_KEY))))
.setBloomFilterColumns(ImmutableSet.copyOf(COLUMN_NAMES_SPLITTER.splitToList(orcBloomFilterColumns.get())))
.setBloomFilterFpp(fpp)
.build();
}
catch (NumberFormatException e) {
throw new TrinoException(ICEBERG_INVALID_METADATA, format("Invalid value for %s property: %s", ORC_BLOOM_FILTER_FPP, storageProperties.get(ORC_BLOOM_FILTER_FPP_KEY)));
throw new TrinoException(ICEBERG_INVALID_METADATA, format("Invalid value for %s property: %s", ORC_BLOOM_FILTER_FPP_PROPERTY, orcBloomFilterFpp.get()));
}
}
return orcWriterOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,6 @@ public class IcebergMetadata
private static final String UNKNOWN_SNAPSHOT_TOKEN = "UNKNOWN";
public static final Set<String> UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(FILE_FORMAT_PROPERTY, FORMAT_VERSION_PROPERTY, PARTITIONING_PROPERTY, SORTED_BY_PROPERTY);

public static final String ORC_BLOOM_FILTER_COLUMNS_KEY = "orc.bloom.filter.columns";
public static final String ORC_BLOOM_FILTER_FPP_KEY = "orc.bloom.filter.fpp";

public static final String NUMBER_OF_DISTINCT_VALUES_NAME = "NUMBER_OF_DISTINCT_VALUES";
private static final FunctionName NUMBER_OF_DISTINCT_VALUES_FUNCTION = new FunctionName(IcebergThetaSketchForStats.NAME);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public class IcebergTableProperties
public static final String SORTED_BY_PROPERTY = "sorted_by";
public static final String LOCATION_PROPERTY = "location";
public static final String FORMAT_VERSION_PROPERTY = "format_version";
public static final String ORC_BLOOM_FILTER_COLUMNS = "orc_bloom_filter_columns";
public static final String ORC_BLOOM_FILTER_FPP = "orc_bloom_filter_fpp";
public static final String ORC_BLOOM_FILTER_COLUMNS_PROPERTY = "orc_bloom_filter_columns";
public static final String ORC_BLOOM_FILTER_FPP_PROPERTY = "orc_bloom_filter_fpp";

private final List<PropertyMetadata<?>> tableProperties;

Expand Down Expand Up @@ -90,7 +90,7 @@ public IcebergTableProperties(
IcebergTableProperties::validateFormatVersion,
false))
.add(new PropertyMetadata<>(
ORC_BLOOM_FILTER_COLUMNS,
ORC_BLOOM_FILTER_COLUMNS_PROPERTY,
"ORC Bloom filter index columns",
new ArrayType(VARCHAR),
List.class,
Expand All @@ -102,7 +102,7 @@ public IcebergTableProperties(
.collect(toImmutableList()),
value -> value))
.add(doubleProperty(
ORC_BLOOM_FILTER_FPP,
ORC_BLOOM_FILTER_FPP_PROPERTY,
"ORC Bloom filter false positive probability",
orcWriterConfig.getDefaultBloomFilterFpp(),
IcebergTableProperties::validateOrcBloomFilterFpp,
Expand Down Expand Up @@ -154,13 +154,13 @@ private static void validateFormatVersion(int version)

public static List<String> getOrcBloomFilterColumns(Map<String, Object> tableProperties)
{
List<String> orcBloomFilterColumns = (List<String>) tableProperties.get(ORC_BLOOM_FILTER_COLUMNS);
List<String> orcBloomFilterColumns = (List<String>) tableProperties.get(ORC_BLOOM_FILTER_COLUMNS_PROPERTY);
return orcBloomFilterColumns == null ? ImmutableList.of() : ImmutableList.copyOf(orcBloomFilterColumns);
}

public static Double getOrcBloomFilterFpp(Map<String, Object> tableProperties)
{
return (Double) tableProperties.get(ORC_BLOOM_FILTER_FPP);
return (Double) tableProperties.get(ORC_BLOOM_FILTER_FPP_PROPERTY);
}

private static void validateOrcBloomFilterFpp(double fpp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand All @@ -103,17 +104,13 @@
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnMetadata;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE;
import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_COLUMNS_KEY;
import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_FPP_KEY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_COLUMNS;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_COLUMNS_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.getOrcBloomFilterColumns;
import static io.trino.plugin.iceberg.IcebergTableProperties.getOrcBloomFilterFpp;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
import static io.trino.plugin.iceberg.IcebergTableProperties.getSortOrder;
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
Expand Down Expand Up @@ -159,6 +156,8 @@
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH;
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS;
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
Expand All @@ -175,6 +174,10 @@ public final class IcebergUtil
public static final String METADATA_FOLDER_NAME = "metadata";
public static final String METADATA_FILE_EXTENSION = ".metadata.json";
public static final String TRINO_QUERY_ID_NAME = "trino_query_id";
// For backward compatibility only. DO NOT USE.
private static final String BROKEN_ORC_BLOOM_FILTER_FPP_KEY = "orc.bloom.filter.fpp";
// For backward compatibility only. DO NOT USE.
private static final String BROKEN_ORC_BLOOM_FILTER_COLUMNS_KEY = "orc.bloom.filter.columns";
private static final Pattern SIMPLE_NAME = Pattern.compile("[a-z][a-z0-9]*");
// Metadata file name examples
// - 00001-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json
Expand Down Expand Up @@ -242,18 +245,41 @@ public static Map<String, Object> getIcebergTableProperties(Table icebergTable)
properties.put(FORMAT_VERSION_PROPERTY, formatVersion);

// iceberg ORC format bloom filter properties
String orcBloomFilterColumns = icebergTable.properties().get(ORC_BLOOM_FILTER_COLUMNS_KEY);
if (orcBloomFilterColumns != null) {
properties.put(ORC_BLOOM_FILTER_COLUMNS, Splitter.on(',').trimResults().omitEmptyStrings().splitToList(orcBloomFilterColumns));
Optional<String> orcBloomFilterColumns = getOrcBloomFilterColumns(icebergTable.properties());
if (orcBloomFilterColumns.isPresent()) {
properties.put(ORC_BLOOM_FILTER_COLUMNS_PROPERTY, Splitter.on(',').trimResults().omitEmptyStrings().splitToList(orcBloomFilterColumns.get()));
}
String orcBloomFilterFpp = icebergTable.properties().get(ORC_BLOOM_FILTER_FPP_KEY);
if (orcBloomFilterFpp != null) {
properties.put(ORC_BLOOM_FILTER_FPP, Double.parseDouble(orcBloomFilterFpp));
// iceberg ORC format bloom filter properties
Optional<String> orcBloomFilterFpp = getOrcBloomFilterFpp(icebergTable.properties());
if (orcBloomFilterFpp.isPresent()) {
properties.put(ORC_BLOOM_FILTER_FPP_PROPERTY, Double.parseDouble(orcBloomFilterFpp.get()));
}

return properties.buildOrThrow();
}

// Version 382-438 set incorrect table properties: https://github.com/trinodb/trino/commit/b89aac68c43e5392f23b8d6ba053bbeb6df85028#diff-2af3e19a6b656640a7d0bb73114ef224953a2efa04e569b1fe4da953b2cc6d15R418-R419
// `orc.bloom.filter.columns` was set instead of `write.orc.bloom.filter.columns`, and `orc.bloom.filter.fpp` instead of `write.orc.bloom.filter.fpp`
// These methods maintain backward compatibility for existing table.
public static Optional<String> getOrcBloomFilterColumns(Map<String, String> properties)
{
Optional<String> orcBloomFilterColumns = Stream.of(
properties.get(ORC_BLOOM_FILTER_COLUMNS),
properties.get(BROKEN_ORC_BLOOM_FILTER_COLUMNS_KEY))
.filter(Objects::nonNull)
.findFirst();
return orcBloomFilterColumns;
}

public static Optional<String> getOrcBloomFilterFpp(Map<String, String> properties)
{
return Stream.of(
properties.get(ORC_BLOOM_FILTER_FPP),
properties.get(BROKEN_ORC_BLOOM_FILTER_FPP_KEY))
.filter(Objects::nonNull)
.findFirst();
}

public static List<IcebergColumnHandle> getColumns(Schema schema, TypeManager typeManager)
{
return schema.columns().stream()
Expand Down Expand Up @@ -659,12 +685,12 @@ public static Map<String, String> createTableProperties(ConnectorTableMetadata t
propertiesBuilder.put(FORMAT_VERSION, Integer.toString(IcebergTableProperties.getFormatVersion(tableMetadata.getProperties())));

// iceberg ORC format bloom filter properties used by create table
List<String> columns = getOrcBloomFilterColumns(tableMetadata.getProperties());
List<String> columns = IcebergTableProperties.getOrcBloomFilterColumns(tableMetadata.getProperties());
if (!columns.isEmpty()) {
checkFormatForProperty(fileFormat.toIceberg(), FileFormat.ORC, ORC_BLOOM_FILTER_COLUMNS);
checkFormatForProperty(fileFormat.toIceberg(), FileFormat.ORC, ORC_BLOOM_FILTER_COLUMNS_PROPERTY);
validateOrcBloomFilterColumns(tableMetadata, columns);
propertiesBuilder.put(ORC_BLOOM_FILTER_COLUMNS_KEY, Joiner.on(",").join(columns));
propertiesBuilder.put(ORC_BLOOM_FILTER_FPP_KEY, String.valueOf(getOrcBloomFilterFpp(tableMetadata.getProperties())));
propertiesBuilder.put(ORC_BLOOM_FILTER_COLUMNS, Joiner.on(",").join(columns));
propertiesBuilder.put(ORC_BLOOM_FILTER_FPP, String.valueOf(IcebergTableProperties.getOrcBloomFilterFpp(tableMetadata.getProperties())));
}

if (tableMetadata.getComment().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.trino.plugin.hive.HiveCompressionCodec;
import io.trino.plugin.hive.TestingHivePlugin;
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.plugin.iceberg.fileio.ForwardingOutputFile;
import io.trino.server.DynamicFilterService;
import io.trino.spi.QueryId;
import io.trino.spi.connector.ColumnHandle;
Expand Down Expand Up @@ -152,6 +153,7 @@
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;
import static org.apache.iceberg.TableMetadata.newTableMetadata;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.offset;
Expand Down Expand Up @@ -4714,6 +4716,36 @@ protected void verifyIcebergTableProperties(MaterializedResult actual)
assertEqualsIgnoreOrder(actual.getMaterializedRows(), expected.getMaterializedRows());
}

@Test
public void testGetIcebergTableWithLegacyOrcBloomFilterProperties()
throws Exception
{
String tableName = "test_get_table_with_legacy_orc_bloom_filter_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1);

String tableLocation = getTableLocation(tableName);
String metadataLocation = getLatestMetadataLocation(fileSystem, tableLocation);

TableMetadata tableMetadata = TableMetadataParser.read(new ForwardingFileIo(fileSystem), metadataLocation);
ImmutableMap<String, String> newProperties = ImmutableMap.<String, String>builder()
.putAll(tableMetadata.properties())
.put("orc.bloom.filter.columns", "x,y") // legacy incorrect property
.put("orc.bloom.filter.fpp", "0.2") // legacy incorrect property
.buildOrThrow();
TableMetadata newTableMetadata = newTableMetadata(
tableMetadata.schema(),
tableMetadata.spec(),
tableMetadata.sortOrder(),
tableMetadata.location(),
newProperties);
TableMetadataParser.overwrite(newTableMetadata, new ForwardingOutputFile(fileSystem, Location.of(metadataLocation)));

MaterializedResult actualCreateTable = computeActual("SHOW CREATE TABLE " + tableName);
assertThat(actualCreateTable).isNotNull();
assertThat(actualCreateTable.getMaterializedRows().getFirst().toString())
.contains(ImmutableList.of("orc_bloom_filter_columns", "orc_bloom_filter_fpp"));
}

protected abstract boolean supportsIcebergFileStatistics(String typeName);

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.trino.testing.BaseOrcWithBloomFiltersTest;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import org.junit.jupiter.api.Test;

import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.QueryAssertions.assertContains;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;

public class TestIcebergOrcWithBloomFilters
extends BaseOrcWithBloomFiltersTest
Expand All @@ -39,4 +46,26 @@ protected String getTableProperties(String bloomFilterColumnName, String bucketi
bloomFilterColumnName,
bucketingColumnName);
}

@Test
public void testBloomFilterPropertiesArePersistedDuringCreate()
{
String tableName = "test_metadata_write_properties_" + randomNameSuffix();
assertQuerySucceeds("CREATE TABLE " + tableName + " (a bigint, b bigint, c bigint) WITH (" +
"format = 'orc'," +
"orc_bloom_filter_columns = array['a','b']," +
"orc_bloom_filter_fpp = 0.1)");

MaterializedResult actualProperties = computeActual("SELECT * FROM \"" + tableName + "$properties\"");
assertThat(actualProperties).isNotNull();
MaterializedResult expectedProperties = resultBuilder(getSession())
.row("write.orc.bloom.filter.columns", "a,b")
.row("write.orc.bloom.filter.fpp", "0.1").build();
assertContains(actualProperties, expectedProperties);

MaterializedResult actualCreateTable = computeActual("SHOW CREATE TABLE " + tableName);
assertThat(actualCreateTable).isNotNull();
assertThat(actualCreateTable.getMaterializedRows().getFirst().toString())
.contains(ImmutableList.of("orc_bloom_filter_columns", "orc_bloom_filter_fpp"));
}
}

0 comments on commit 76fe740

Please sign in to comment.