-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Parquet bloom filter write support to Iceberg connector #21602
Add Parquet bloom filter write support to Iceberg connector #21602
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add some compatibility testing with Spark in TestIcebergSparkCompatibility to verify that it honors bloom filter property set by us and can read files with bloom filters written by us.
Other than that lgtm
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java
Show resolved
Hide resolved
@leetcode-1533 FYI |
@jkylling gentle reminder about going forward with this contribution. |
5a44eb2
to
619b0d9
Compare
Added a product test which tests that Trino and Spark can read the Iceberg tables written by each other when the Bloom filter table properties are set. I've not verified if the files written by Spark contain Bloom filters. Here's a little rant about the experience of writing product tests for this, with the hope that it might help improve the experience (there were more steps involved than the ones below):
|
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
Outdated
Show resolved
Hide resolved
619b0d9
to
2c5d886
Compare
{ | ||
return properties.entrySet().stream() | ||
.filter(entry -> entry.getKey().startsWith(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX) && "true".equals(entry.getValue())) | ||
.map(entry -> entry.getKey().substring(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX.length())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to lowercase the column names?
We'd probably need a spark compatibility test using case sensitive column names to check this
I see already testSparkReadingTrinoBloomFilters
@@ -45,6 +45,7 @@ public class IcebergTableProperties | |||
public static final String FORMAT_VERSION_PROPERTY = "format_version"; | |||
public static final String ORC_BLOOM_FILTER_COLUMNS_PROPERTY = "orc_bloom_filter_columns"; | |||
public static final String ORC_BLOOM_FILTER_FPP_PROPERTY = "orc_bloom_filter_fpp"; | |||
public static final String PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY = "parquet_bloom_filter_columns"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark-sql (default)> CREATE TABLE t1 (testInteger INTEGER, testLong BIGINT, testString STRING, testDouble DOUBLE, testFloat REAL)
> USING iceberg
> TBLPROPERTIES (
> 'write.parquet.bloom-filter-enabled.column.testInteger' = true,
> 'write.parquet.bloom-filter-enabled.column.testLong' = true,
> 'write.parquet.bloom-filter-enabled.column.testString' = true,
> 'write.parquet.bloom-filter-enabled.column.testDouble' = true,
> 'write.parquet.bloom-filter-enabled.column.testFloat' = true
> );
trino> show create table iceberg.default.t1;
Create Table
------------------------------------------------------------------
CREATE TABLE iceberg.default.t1 (
testinteger integer,
testlong bigint,
teststring varchar,
testdouble double,
testfloat real
)
WITH (
format = 'PARQUET',
format_version = 2,
location = 'hdfs://hadoop-master:9000/user/hive/warehouse/t1'
)
(1 row)
Shouldn't we see in SHOW CREATE TABLE
the bloom filter columns now that we're dealing with a supported table property?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modify io.trino.plugin.iceberg.IcebergUtil#getIcebergTableProperties
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i tried on the above scaffolding
SELECT COUNT(*) FROM iceberg.default.t1 where testInteger in (9444, -88777, 6711111);
and see the following
"queryStats" : {
....
"physicalInputDataSize" : "656400B",
"failedPhysicalInputDataSize" : "0B",
"physicalInputPositions" : 5,
This seems not to overlap with the expectations from io.trino.testing.BaseTestParquetWithBloomFilters#testBloomFilterRowGroupPruning(io.trino.spi.connector.CatalogSchemaTableName, java.lang.String)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add a toLowerCase
to getParquetBloomFilterColumns
to handle this? It looks like we have the same issues for the Iceberg ORC Bloom filters. Should we handle case sensitivity in this PR, or handle it in a follow up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's rather fix the functionality in the existing PR instead of delivering a half-baked functionality which may potentially back-fire with bugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative with less headaches would be to register a pre-created resource table and check the query stats on it similar to what has been done on https://github.com/trinodb/trino/blob/ca209630136eabda2449594ef2b6a4d82fb9c2e5/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergReadVersionedTableByTemporal.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Easy access to this would be useful to have in the product tests. It would allow the product tests in this PR to give more coverage. Unfortunately, product tests are not my cup of tea for Friday hacking 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a mechanism to get the query stats in the product tests to ensure that the bloom filter is actually effective and we don't introduce while refactoring regressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would someone be able to help add this logic? I don't have much experience with the product tests and unfortunately don't have much capacity to follow up on this at the moment. It would be much appreciated!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@findinpath aren't we already testing effectiveness of bloom filter in query runner tests ? I'm not sure that we should block this PR over checking this in product tests as well, we don't do that with Apache Hive for bloom filters in hive connector as well.
@@ -2924,6 +2924,67 @@ public void testSparkAlterStructColumnType(StorageFormat storageFormat) | |||
onSpark().executeQuery("DROP TABLE " + sparkTableName); | |||
} | |||
|
|||
@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | |||
public void testSparkReadingTrinoBloomFilters() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls add as well a test which creates the table through Spark with case sensitive column names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Case sensitive column names seem not to be handled as expected.
2c5d886
to
7a59385
Compare
@@ -170,6 +172,7 @@ private IcebergFileWriter createParquetWriter( | |||
.setMaxPageValueCount(getParquetWriterPageValueCount(session)) | |||
.setMaxBlockSize(getParquetWriterBlockSize(session)) | |||
.setBatchSize(getParquetWriterBatchSize(session)) | |||
.setBloomFilterColumns(getParquetBloomFilterColumns(storageProperties)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few lines above you have the original fileColumnNames
- please use those in correlation with what is specified in the table properties (case insensitive name matching) in getParquetBloomFilterColumns
.
Also a new test to add: schema evolution - create a table with a bunch of bloom filter columns, drop one of the columns which was specified as bloom filter column and make sure that you don't get any errors . I'm guessing we'd have to filter out in getParquetBloomFilterColumns
the column names which don't exist anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The writing logic ignores non-existent columns for which the Bloom filter property is set.
@@ -2924,6 +2924,114 @@ public void testSparkAlterStructColumnType(StorageFormat storageFormat) | |||
onSpark().executeQuery("DROP TABLE " + sparkTableName); | |||
} | |||
|
|||
@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) | |||
public void testSparkReadingTrinoBloomFilters() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either testSparkReadingTrinoParquetBloomFilters
or add logic to run for both parquet and orc
For having an effective bloom filter in ORC you need over 10_000
rows.
if (rowsInRowGroup.isPresent() && stripe.getNumberOfRows() > rowsInRowGroup.getAsInt()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not bring ORC tests in this PR, renaming to testSparkReadingTrinoParquetBloomFilters
is fine
...roduct-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java
Show resolved
Hide resolved
{ | ||
return properties.entrySet().stream() | ||
.filter(entry -> entry.getKey().startsWith(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX) && "true".equals(entry.getValue())) | ||
.map(entry -> entry.getKey().substring(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX.length()).toLowerCase(Locale.ENGLISH)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is incorrect - you are affecting the write parquet logic as well if you do lower case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have other code in the Iceberg connector where we need to handle the case sensitivity(?) of Iceberg columns and the case insensitivity of Trino columns? This would be useful to understand how to handle this case.
@raunaqmorarka , @jkylling sorry for delaying this work with my comments. The scaffolding needed for getting query stats is not present at the moment in the product tests. @jkylling pls wrap the work (I think |
|
7a59385
to
6612eff
Compare
...roduct-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java
Show resolved
Hide resolved
@jkylling this will improve the read from table with bloom filter or that it only deal with creating bloom filter? |
@shohamyamin This only adds write support. Read support for Bloom filters were already added in 406. Please see #9471 for the issue which tracked this. |
6612eff
to
ac3b281
Compare
Description
Additional context and related issues
Part of #21570
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text: