-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support converting column stats on row type to json in Delta Lake #14314
Conversation
a5a1264
to
497e650
Compare
497e650
to
8c61387
Compare
CI hit #14391 at
|
ImmutableMap.Builder<String, Object> fieldValues = ImmutableMap.builder(); | ||
for (int i = 0; i < rowBlock.getPositionCount(); i++) { | ||
RowType.Field field = rowType.getFields().get(i); | ||
Object fieldValue = readNativeValue(field.getType(), rowBlock.getChildren().get(i), i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than getChildren
I think you want to convert the rowBlock to a ColumnarRow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The argument is SingleRowBlock
which is unsupported in ColumnarRow#toColumnarRow
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's surprising, toColumnarRow
checks that the input is an instance of AbstractRowBlock, which SingleRowBlock extends. Seems like it should work.
Where does the error come from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toColumnarRow checks that the input is an instance of AbstractRowBlock, which SingleRowBlock extends. Seems like it should work.
SingleRowBlock extends AbstractSingleRowBlock, not AbstractRowBlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry I can't read
.../src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeParquetStatisticsUtils.java
Show resolved
Hide resolved
.../src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeParquetStatisticsUtils.java
Outdated
Show resolved
Hide resolved
.../src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeParquetStatisticsUtils.java
Show resolved
Hide resolved
CI hit #14391 |
// The first two entries created by Databricks have column stats. The last one doesn't have column stats because the connector doesn't support collecting it on row columns. | ||
List<AddFileEntry> addFileEntries = getAddFileEntries("json_stats_on_row_type").stream().sorted(comparing(AddFileEntry::getModificationTime)).collect(toImmutableList()); | ||
assertThat(addFileEntries).hasSize(3); | ||
assertJsonStatistics( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertions for addFileEntries.get(0)
and addFileEntries.get(1)
are not relevant. The stats already existed there before running the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, they're relevant. Those two assertions fail if we don't copy the statistics.
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; | ||
import static io.trino.testing.TestingConnectorSession.SESSION; | ||
|
||
public final class TestDeltaLakeUtils |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test -> Testing
{ | ||
private TestDeltaLakeUtils() {} | ||
|
||
public static List<AddFileEntry> getAddFileEntries(SchemaTableName table, String tableLocation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The table
has no impact on the result of this method, so you can remove this parameter and use eg new SchemaTableName("dummy_schema_placeholder", "dummy_table_placeholder")
below
@@ -222,6 +240,37 @@ private static Map<String, Object> jsonEncode(Map<String, Optional<Statistics<?> | |||
.collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().get())); | |||
} | |||
|
|||
public static Map<String, Object> toNullCounts(Map<String, Type> columnTypeMapping, Map<String, Object> values) | |||
{ | |||
verify(columnTypeMapping.keySet().containsAll(values.keySet()), "columnTypeMapping should contains all keys of values"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
include the key sets in the message
also, would be nice to add a comment why this is expected. it's not obvious to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw instead of this check here, i'd rather have a non-null check on type
after Type type = columnTypeMapping.get(value.getKey());
line
public void testConvertJsonStatisticsToParquetOnRowType() | ||
throws Exception | ||
{ | ||
verifySupportsInsert(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a "verify ..." should verify, i.e. ensure something is true
as a follow-up we could rename this to eg skipUnlessInsertsSupported
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will send a follow-up PR.
@@ -0,0 +1 @@ | |||
{"version":2,"size":4} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the test, do we need transaction json files before the checkpoint (0 and 1) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those files aren't required. Removed.
assertUpdate("INSERT INTO json_stats_on_row_type SELECT CAST(row(3) AS row(x bigint)), CAST(row(row('test insert')) AS row(y row(nested varchar)))", 1); | ||
|
||
// The first two entries created by Databricks have column stats. The last one doesn't have column stats because the connector doesn't support collecting it on row columns. | ||
List<AddFileEntry> addFileEntries = getAddFileEntries("json_stats_on_row_type").stream().sorted(comparing(AddFileEntry::getModificationTime)).collect(toImmutableList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do the getAddFileEntries
come from a new snapshot that we just created, or from previous snapshot + transaction log files?
i think the intention is that we create transaction 4 and a checkpoint, so let's verify that happened
Data generated using Databricks 10.4: | ||
|
||
```sql | ||
CREATE TABLE default.json_stats_on_row_type | ||
(struct_col struct<x bigint>, nested_struct_col struct<y struct<nested string>>) | ||
USING DELTA | ||
LOCATION 's3://bucket/table' | ||
TBLPROPERTIES ( | ||
delta.checkpointInterval = 2, | ||
delta.checkpoint.writeStatsAsJson = false, | ||
delta.checkpoint.writeStatsAsStruct = true | ||
); | ||
|
||
INSERT INTO default.json_stats_on_row_type SELECT named_struct('x', 1), named_struct('y', named_struct('nested', 'test')); | ||
INSERT INTO default.json_stats_on_row_type SELECT named_struct('x', NULL), named_struct('y', named_struct('nested', NULL)); | ||
|
||
ALTER TABLE default.json_stats_on_row_type SET TBLPROPERTIES ( | ||
'delta.checkpoint.writeStatsAsJson' = true, | ||
'delta.checkpoint.writeStatsAsStruct' = false | ||
); | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
Additionally, remove a redundant argument.
2312115
to
bcbbc9f
Compare
Description
Fixes #13996
Release notes
(x) This is not user-visible or docs only and no release notes are required.