diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index f6ebca7a9a19..15e2f3b05e08 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -505,7 +505,7 @@ Table statistics ^^^^^^^^^^^^^^^^ You can use :doc:`/sql/analyze` statements in Trino to populate the table -statistics in Delta Lake. Number of distinct values (NDV) +statistics in Delta Lake. Data size and number of distinct values (NDV) statistics are supported, while Minimum value, maximum value, and null value count statistics are not supported. The :doc:`cost-based optimizer ` then uses these statistics to improve diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 9858b4221bc8..347ddb36a09a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableTable; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import io.airlift.json.JsonCodec; @@ -59,6 +60,7 @@ import io.trino.plugin.hive.security.AccessControlMetadata; import io.trino.spi.NodeManager; import io.trino.spi.TrinoException; +import io.trino.spi.block.Block; import io.trino.spi.connector.Assignment; import io.trino.spi.connector.BeginTableExecuteResult; import io.trino.spi.connector.CatalogSchemaName; @@ -95,11 +97,13 @@ import io.trino.spi.security.RoleGrant; import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.statistics.ColumnStatisticMetadata; +import io.trino.spi.statistics.ColumnStatisticType; import io.trino.spi.statistics.ComputedStatistics; import io.trino.spi.statistics.TableStatistics; import io.trino.spi.statistics.TableStatisticsMetadata; import io.trino.spi.type.ArrayType; import io.trino.spi.type.DecimalType; +import io.trino.spi.type.FixedWidthType; import io.trino.spi.type.HyperLogLogType; import io.trino.spi.type.MapType; import io.trino.spi.type.RowType; @@ -128,6 +132,7 @@ import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; @@ -195,6 +200,7 @@ import static io.trino.spi.predicate.ValueSet.ofRanges; import static io.trino.spi.statistics.ColumnStatisticType.MAX_VALUE; import static io.trino.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES_SUMMARY; +import static io.trino.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc; @@ -242,6 +248,10 @@ public class DeltaLakeMetadata // Matches the dummy column Databricks stores in the metastore private static final List DUMMY_DATA_COLUMNS = ImmutableList.of( new Column("col", HiveType.toHiveType(new ArrayType(VarcharType.createUnboundedVarcharType())), Optional.empty())); + private static final Set SUPPORTED_STATISTICS_TYPE = ImmutableSet.builder() + .add(TOTAL_SIZE_IN_BYTES) + .add(NUMBER_OF_DISTINCT_VALUES_SUMMARY) + .build(); private final DeltaLakeMetastore metastore; private final HdfsEnvironment hdfsEnvironment; @@ -1932,8 +1942,14 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession analyzeColumnNames .map(columnNames -> columnNames.contains(columnMetadata.getName())) .orElse(true)) - .map(columnMetadata -> new ColumnStatisticMetadata(columnMetadata.getName(), NUMBER_OF_DISTINCT_VALUES_SUMMARY)) - .forEach(columnStatistics::add); + .forEach(columnMetadata -> { + if (!(columnMetadata.getType() instanceof FixedWidthType)) { + if (statistics.isEmpty() || totalSizeStatisticsExists(statistics.get().getColumnStatistics(), columnMetadata.getName())) { + columnStatistics.add(new ColumnStatisticMetadata(columnMetadata.getName(), TOTAL_SIZE_IN_BYTES)); + } + } + columnStatistics.add(new ColumnStatisticMetadata(columnMetadata.getName(), NUMBER_OF_DISTINCT_VALUES_SUMMARY)); + }); // collect max(file modification time) for sake of incremental ANALYZE columnStatistics.add(new ColumnStatisticMetadata(FILE_MODIFIED_TIME_COLUMN_NAME, MAX_VALUE)); @@ -1958,6 +1974,11 @@ private static boolean shouldCollectExtendedStatistics(ColumnMetadata columnMeta return true; } + private static boolean totalSizeStatisticsExists(Map statistics, String columnName) + { + return statistics.containsKey(columnName) && statistics.get(columnName).getTotalSizeInBytes().isPresent(); + } + @Override public ConnectorTableHandle beginStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle) { @@ -2130,26 +2151,53 @@ private static Map toDeltaLakeColumnStatistic { // Only statistics for whole table are collected ComputedStatistics singleStatistics = Iterables.getOnlyElement(computedStatistics); + return createColumnToComputedStatisticsMap(singleStatistics.getColumnStatistics()).entrySet().stream() + .collect(toImmutableMap(Map.Entry::getKey, entry -> createDeltaLakeColumnStatistics(entry.getValue()))); + } - return singleStatistics.getColumnStatistics().entrySet().stream() - .filter(not(entry -> entry.getKey().getColumnName().equals(FILE_MODIFIED_TIME_COLUMN_NAME))) - .collect(toImmutableMap( - entry -> entry.getKey().getColumnName(), - entry -> { - ColumnStatisticMetadata columnStatisticMetadata = entry.getKey(); - if (columnStatisticMetadata.getStatisticType() != NUMBER_OF_DISTINCT_VALUES_SUMMARY) { - throw new TrinoException( - GENERIC_INTERNAL_ERROR, - "Unexpected statistics type " + columnStatisticMetadata.getStatisticType() + " found for column " + columnStatisticMetadata.getColumnName()); - } - if (entry.getValue().isNull(0)) { - return DeltaLakeColumnStatistics.create(HyperLogLog.newInstance(4096)); // empty HLL with number of buckets used by $approx_set - } - else { - Slice serializedSummary = (Slice) blockToNativeValue(HyperLogLogType.HYPER_LOG_LOG, entry.getValue()); - return DeltaLakeColumnStatistics.create(HyperLogLog.newInstance(serializedSummary)); - } - })); + private static Map> createColumnToComputedStatisticsMap(Map computedStatistics) + { + ImmutableTable.Builder result = ImmutableTable.builder(); + computedStatistics.forEach((metadata, block) -> { + if (metadata.getColumnName().equals(FILE_MODIFIED_TIME_COLUMN_NAME)) { + return; + } + if (!SUPPORTED_STATISTICS_TYPE.contains(metadata.getStatisticType())) { + throw new TrinoException( + GENERIC_INTERNAL_ERROR, + "Unexpected statistics type " + metadata.getStatisticType() + " found for column " + metadata.getColumnName()); + } + + result.put(metadata.getColumnName(), metadata.getStatisticType(), block); + }); + return result.buildOrThrow().rowMap(); + } + + private static DeltaLakeColumnStatistics createDeltaLakeColumnStatistics(Map computedStatistics) + { + OptionalLong totalSize = OptionalLong.empty(); + if (computedStatistics.containsKey(TOTAL_SIZE_IN_BYTES)) { + totalSize = getLongValue(computedStatistics.get(TOTAL_SIZE_IN_BYTES)); + } + HyperLogLog ndvSummary = getHyperLogLogForNdv(computedStatistics.get(NUMBER_OF_DISTINCT_VALUES_SUMMARY)); + return DeltaLakeColumnStatistics.create(totalSize, ndvSummary); + } + + private static OptionalLong getLongValue(Block block) + { + if (block.isNull(0)) { + return OptionalLong.of(0); + } + return OptionalLong.of(BIGINT.getLong(block, 0)); + } + + private static HyperLogLog getHyperLogLogForNdv(Block block) + { + if (block.isNull(0)) { + return HyperLogLog.newInstance(4096); // number of buckets used by $approx_set + } + Slice serializedSummary = (Slice) blockToNativeValue(HyperLogLogType.HYPER_LOG_LOG, block); + return HyperLogLog.newInstance(serializedSummary); } private static Optional getMaxFileModificationTime(Collection computedStatistics) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java index b1cca97c0d17..eb9c54fc5b09 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java @@ -352,6 +352,7 @@ else if (isValidInRange(minValue)) { if (statistics.isPresent()) { DeltaLakeColumnStatistics deltaLakeColumnStatistics = statistics.get().getColumnStatistics().get(column.getName()); if (deltaLakeColumnStatistics != null && column.getColumnType() != PARTITION_KEY) { + deltaLakeColumnStatistics.getTotalSizeInBytes().ifPresent(size -> columnStatsBuilder.setDataSize(Estimate.of(size))); columnStatsBuilder.setDistinctValuesCount(Estimate.of(deltaLakeColumnStatistics.getNdvSummary().cardinality())); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/DeltaLakeColumnStatistics.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/DeltaLakeColumnStatistics.java index d31b2ca214ef..a7e49b2f9f0e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/DeltaLakeColumnStatistics.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/DeltaLakeColumnStatistics.java @@ -19,32 +19,43 @@ import io.airlift.stats.cardinality.HyperLogLog; import java.util.Base64; +import java.util.OptionalLong; import static java.util.Objects.requireNonNull; public class DeltaLakeColumnStatistics { + private final OptionalLong totalSizeInBytes; private final HyperLogLog ndvSummary; @JsonCreator public static DeltaLakeColumnStatistics create( + @JsonProperty("totalSizeInBytes") OptionalLong totalSizeInBytes, @JsonProperty("ndvSummary") String ndvSummaryBase64) { + requireNonNull(totalSizeInBytes, "totalSizeInBytes is null"); requireNonNull(ndvSummaryBase64, "ndvSummaryBase64 is null"); byte[] ndvSummaryBytes = Base64.getDecoder().decode(ndvSummaryBase64); - return new DeltaLakeColumnStatistics(HyperLogLog.newInstance(Slices.wrappedBuffer(ndvSummaryBytes))); + return new DeltaLakeColumnStatistics(totalSizeInBytes, HyperLogLog.newInstance(Slices.wrappedBuffer(ndvSummaryBytes))); } - public static DeltaLakeColumnStatistics create(HyperLogLog ndvSummary) + public static DeltaLakeColumnStatistics create(OptionalLong totalSizeInBytes, HyperLogLog ndvSummary) { - return new DeltaLakeColumnStatistics(ndvSummary); + return new DeltaLakeColumnStatistics(totalSizeInBytes, ndvSummary); } - private DeltaLakeColumnStatistics(HyperLogLog ndvSummary) + private DeltaLakeColumnStatistics(OptionalLong totalSizeInBytes, HyperLogLog ndvSummary) { + this.totalSizeInBytes = requireNonNull(totalSizeInBytes, "totalSizeInBytes is null"); this.ndvSummary = requireNonNull(ndvSummary, "ndvSummary is null"); } + @JsonProperty + public OptionalLong getTotalSizeInBytes() + { + return totalSizeInBytes; + } + @JsonProperty("ndvSummary") public String getNdvSummaryBase64() { @@ -58,8 +69,17 @@ public HyperLogLog getNdvSummary() public DeltaLakeColumnStatistics update(DeltaLakeColumnStatistics newStatistics) { + OptionalLong totalSizeInBytes = mergeIntegerStatistics(this.totalSizeInBytes, newStatistics.totalSizeInBytes); HyperLogLog ndvSummary = HyperLogLog.newInstance(this.ndvSummary.serialize()); ndvSummary.mergeWith(newStatistics.ndvSummary); - return new DeltaLakeColumnStatistics(ndvSummary); + return new DeltaLakeColumnStatistics(totalSizeInBytes, ndvSummary); + } + + private static OptionalLong mergeIntegerStatistics(OptionalLong first, OptionalLong second) + { + if (first.isPresent() && second.isPresent()) { + return OptionalLong.of(first.getAsLong() + second.getAsLong()); + } + return OptionalLong.empty(); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index 53856b3b7a0e..1465e05e660a 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -1155,8 +1155,8 @@ public void testAnalyze() "VALUES " + "('nationkey', null, 25.0, 0.0, null, 0, 24)," + "('regionkey', null, 5.0, 0.0, null, 0, 4)," + - "('comment', null, 25.0, 0.0, null, null, null)," + - "('name', null, 25.0, 0.0, null, null, null)," + + "('comment', 1857.0, 25.0, 0.0, null, null, null)," + + "('name', 177.0, 25.0, 0.0, null, null, null)," + "(null, null, null, null, 25.0, null, null)"); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java index 889cda171eb5..a23eec09d689 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java @@ -103,8 +103,8 @@ private void testAnalyze(Optional checkpointInterval) "VALUES " + "('nationkey', null, 25.0, 0.0, null, 0, 24)," + "('regionkey', null, 5.0, 0.0, null, 0, 4)," + - "('comment', null, 25.0, 0.0, null, null, null)," + - "('name', null, 25.0, 0.0, null, null, null)," + + "('comment', 1857.0, 25.0, 0.0, null, null, null)," + + "('name', 177.0, 25.0, 0.0, null, null, null)," + "(null, null, null, null, 25.0, null, null)"); // reanalyze data (1 split is empty values) @@ -115,8 +115,8 @@ private void testAnalyze(Optional checkpointInterval) "VALUES " + "('nationkey', null, 25.0, 0.0, null, 0, 24)," + "('regionkey', null, 5.0, 0.0, null, 0, 4)," + - "('comment', null, 25.0, 0.0, null, null, null)," + - "('name', null, 25.0, 0.0, null, null, null)," + + "('comment', 1857.0, 25.0, 0.0, null, null, null)," + + "('name', 177.0, 25.0, 0.0, null, null, null)," + "(null, null, null, null, 25.0, null, null)"); // insert one more copy; should not influence stats other than rowcount @@ -129,24 +129,24 @@ private void testAnalyze(Optional checkpointInterval) "VALUES " + "('nationkey', null, 25.0, 0.0, null, 0, 24)," + "('regionkey', null, 5.0, 0.0, null, 0, 4)," + - "('comment', null, 25.0, 0.0, null, null, null)," + - "('name', null, 25.0, 0.0, null, null, null)," + + "('comment', 3714.0, 25.0, 0.0, null, null, null)," + + "('name', 354.0, 25.0, 0.0, null, null, null)," + "(null, null, null, null, 50.0, null, null)"); // insert modified rows assertUpdate("INSERT INTO " + tableName + " SELECT nationkey + 25, reverse(name), regionkey + 5, reverse(comment) FROM tpch.sf1.nation", 25); - // without ANALYZE all stats but NDV should be updated + // without ANALYZE all stats but size and NDV should be updated assertQuery( "SHOW STATS FOR " + tableName, "VALUES " + "('nationkey', null, 25.0, 0.0, null, 0, 49)," + "('regionkey', null, 5.0, 0.0, null, 0, 9)," + - "('comment', null, 25.0, 0.0, null, null, null)," + - "('name', null, 25.0, 0.0, null, null, null)," + + "('comment', 3714.0, 25.0, 0.0, null, null, null)," + + "('name', 354.0, 25.0, 0.0, null, null, null)," + "(null, null, null, null, 75.0, null, null)"); - // with analyze we should get new NDV + // with analyze we should get new size and NDV runAnalyzeVerifySplitCount(tableName, 1); assertQuery( @@ -154,8 +154,8 @@ private void testAnalyze(Optional checkpointInterval) "VALUES " + "('nationkey', null, 50.0, 0.0, null, 0, 49)," + "('regionkey', null, 10.0, 0.0, null, 0, 9)," + - "('comment', null, 50.0, 0.0, null, null, null)," + - "('name', null, 50.0, 0.0, null, null, null)," + + "('comment', 5571.0, 50.0, 0.0, null, null, null)," + + "('name', 531.0, 50.0, 0.0, null, null, null)," + "(null, null, null, null, 75.0, null, null)"); } @@ -186,8 +186,8 @@ public void testAnalyzePartitioned() "VALUES " + "('nationkey', null, 25.0, 0.0, null, 0, 24)," + "('regionkey', null, 5.0, 0.0, null, null, null)," + - "('comment', null, 25.0, 0.0, null, null, null)," + - "('name', null, 25.0, 0.0, null, null, null)," + + "('comment', 1857.0, 25.0, 0.0, null, null, null)," + + "('name', 177.0, 25.0, 0.0, null, null, null)," + "(null, null, null, null, 25.0, null, null)"); // insert one more copy; should not influence stats other than rowcount @@ -200,32 +200,32 @@ public void testAnalyzePartitioned() "VALUES " + "('nationkey', null, 25.0, 0.0, null, 0, 24)," + "('regionkey', null, 5.0, 0.0, null, null, null)," + - "('comment', null, 25.0, 0.0, null, null, null)," + - "('name', null, 25.0, 0.0, null, null, null)," + + "('comment', 3714.0, 25.0, 0.0, null, null, null)," + + "('name', 354.0, 25.0, 0.0, null, null, null)," + "(null, null, null, null, 50.0, null, null)"); // insert modified rows assertUpdate("INSERT INTO " + tableName + " SELECT nationkey + 25, reverse(name), regionkey + 5, reverse(comment) FROM tpch.sf1.nation", 25); - // without ANALYZE all stats but NDV should be updated + // without ANALYZE all stats but size and NDV should be updated assertQuery( "SHOW STATS FOR " + tableName, "VALUES " + "('nationkey', null, 25.0, 0.0, null, 0, 49)," + "('regionkey', null, 10.0, 0.0, null, null, null)," + - "('comment', null, 25.0, 0.0, null, null, null)," + - "('name', null, 25.0, 0.0, null, null, null)," + + "('comment', 3714.0, 25.0, 0.0, null, null, null)," + + "('name', 354.0, 25.0, 0.0, null, null, null)," + "(null, null, null, null, 75.0, null, null)"); - // with analyze we should get new NDV + // with analyze we should get new size and NDV runAnalyzeVerifySplitCount(tableName, 5); assertQuery( "SHOW STATS FOR " + tableName, "VALUES " + "('nationkey', null, 50.0, 0.0, null, 0, 49)," + "('regionkey', null, 10.0, 0.0, null, null, null)," + - "('comment', null, 50.0, 0.0, null, null, null)," + - "('name', null, 50.0, 0.0, null, null, null)," + + "('comment', 5571.0, 50.0, 0.0, null, null, null)," + + "('name', 531.0, 50.0, 0.0, null, null, null)," + "(null, null, null, null, 75.0, null, null)"); } @@ -269,8 +269,8 @@ public void testAnalyzeEmpty() "VALUES " + "('nationkey', null, 25.0, 0.0, null, 0, 24)," + "('regionkey', null, 5.0, 0.0, null, 0, 4)," + - "('comment', null, 25.0, 0.0, null, null, null)," + - "('name', null, 25.0, 0.0, null, null, null)," + + "('comment', 1857.0, 25.0, 0.0, null, null, null)," + + "('name', 177.0, 25.0, 0.0, null, null, null)," + "(null, null, null, null, 25.0, null, null)"); } @@ -326,8 +326,8 @@ public void testAnalyzeWithFilesModifiedAfter() "VALUES " + "('nationkey', null, 5.0, 0.0, null, 0, 24)," + "('regionkey', null, 3.0, 0.0, null, 0, 4)," + - "('comment', null, 5.0, 0.0, null, null, null)," + - "('name', null, 5.0, 0.0, null, null, null)," + + "('comment', 434.0, 5.0, 0.0, null, null, null)," + + "('name', 33.0, 5.0, 0.0, null, null, null)," + "(null, null, null, null, 30.0, null, null)"); } @@ -391,8 +391,8 @@ public void testAnalyzeSomeColumns() "VALUES " + "('nationkey', null, 50.0, 0.0, null, 0, 49)," + "('regionkey', null, 10.0, 0.0, null, 0, 9)," + - "('comment', null, 50.0, 0.0, null, null, null)," + - "('name', null, 50.0, 0.0, null, null, null)," + + "('comment', 3764.0, 50.0, 0.0, null, null, null)," + + "('name', 379.0, 50.0, 0.0, null, null, null)," + "(null, null, null, null, 50.0, null, null)"); // we and we should be able to reanalyze with a subset of columns @@ -438,8 +438,8 @@ public void testDropExtendedStats() String extendedStats = "VALUES" + "('nationkey', null, 25.0, 0.0, null, 0, 24)," + "('regionkey', null, 5.0, 0.0, null, 0, 4)," - + "('comment', null, 25.0, 0.0, null, null, null)," - + "('name', null, 25.0, 0.0, null, null, null)," + + "('comment', 1857.0, 25.0, 0.0, null, null, null)," + + "('name', 177.0, 25.0, 0.0, null, null, null)," + "(null, null, null, null, 25.0, null, null)"; assertQuery(query, baseStats); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java index 64228e4b55bd..9ac0ecbec7f1 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java @@ -23,6 +23,7 @@ import io.trino.plugin.deltalake.DeltaLakeConfig; import io.trino.plugin.deltalake.DeltaLakeTableHandle; import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess; +import io.trino.plugin.deltalake.statistics.DeltaLakeColumnStatistics; import io.trino.plugin.deltalake.statistics.ExtendedStatistics; import io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; @@ -90,6 +91,7 @@ public class TestDeltaLakeMetastoreStatistics private DeltaLakeMetastore deltaLakeMetastore; private HiveMetastore hiveMetastore; + private CachingExtendedStatisticsAccess statistics; @BeforeClass public void setupMetastore() @@ -123,7 +125,7 @@ public void setupMetastore() hiveMetastore.createDatabase(new Database("db_name", Optional.empty(), Optional.of("test"), Optional.of(PrincipalType.USER), Optional.empty(), ImmutableMap.of())); - CachingExtendedStatisticsAccess statistics = new CachingExtendedStatisticsAccess(new MetaDirStatisticsAccess(hdfsEnvironment, new JsonCodecFactory().jsonCodec(ExtendedStatistics.class))); + statistics = new CachingExtendedStatisticsAccess(new MetaDirStatisticsAccess(hdfsEnvironment, new JsonCodecFactory().jsonCodec(ExtendedStatistics.class))); deltaLakeMetastore = new HiveMetastoreBackedDeltaLakeMetastore( hiveMetastore, transactionLogAccess, @@ -440,4 +442,54 @@ public void testStatisticsParquetParsedStatisticsNullCount() ColumnStatistics columnStats = statisticsMap.get(new DeltaLakeColumnHandle("i", INTEGER, REGULAR)); assertEquals(columnStats.getNullsFraction(), Estimate.of(3.0 / 9.0)); } + + @Test + public void testExtendedStatisticsWithoutDataSize() + { + // Read extended_stats.json that was generated before supporting data_size + String tableLocation = Resources.getResource("statistics/extended_stats_without_data_size").toExternalForm(); + Optional extendedStatistics = statistics.readExtendedStatistics(SESSION, tableLocation); + assertThat(extendedStatistics).isNotEmpty(); + Map columnStatistics = extendedStatistics.get().getColumnStatistics(); + assertThat(columnStatistics).hasSize(3); + } + + @Test + public void testExtendedStatisticsWithDataSize() + { + // Read extended_stats.json that was generated after supporting data_size + String tableLocation = Resources.getResource("statistics/extended_stats_with_data_size").toExternalForm(); + Optional extendedStatistics = statistics.readExtendedStatistics(SESSION, tableLocation); + assertThat(extendedStatistics).isNotEmpty(); + Map columnStatistics = extendedStatistics.get().getColumnStatistics(); + assertThat(columnStatistics).hasSize(3); + assertEquals(columnStatistics.get("regionkey").getTotalSizeInBytes(), OptionalLong.empty()); + assertEquals(columnStatistics.get("name").getTotalSizeInBytes(), OptionalLong.of(34)); + assertEquals(columnStatistics.get("comment").getTotalSizeInBytes(), OptionalLong.of(330)); + } + + @Test + public void testMergeExtendedStatisticsWithoutAndWithDataSize() + { + // Merge two extended stats files. The first file doesn't have totalSizeInBytes field and the second file has totalSizeInBytes field + Optional statisticsWithoutDataSize = statistics.readExtendedStatistics(SESSION, Resources.getResource("statistics/extended_stats_without_data_size").toExternalForm()); + Optional statisticsWithDataSize = statistics.readExtendedStatistics(SESSION, Resources.getResource("statistics/extended_stats_with_data_size").toExternalForm()); + assertThat(statisticsWithoutDataSize).isNotEmpty(); + assertThat(statisticsWithDataSize).isNotEmpty(); + + Map columnStatisticsWithoutDataSize = statisticsWithoutDataSize.get().getColumnStatistics(); + Map columnStatisticsWithDataSize = statisticsWithDataSize.get().getColumnStatistics(); + + DeltaLakeColumnStatistics mergedRegionKey = columnStatisticsWithoutDataSize.get("regionkey").update(columnStatisticsWithDataSize.get("regionkey")); + assertEquals(mergedRegionKey.getTotalSizeInBytes(), OptionalLong.empty()); + assertEquals(mergedRegionKey.getNdvSummary().cardinality(), 5); + + DeltaLakeColumnStatistics mergedName = columnStatisticsWithoutDataSize.get("name").update(columnStatisticsWithDataSize.get("name")); + assertEquals(mergedName.getTotalSizeInBytes(), OptionalLong.empty()); + assertEquals(mergedName.getNdvSummary().cardinality(), 5); + + DeltaLakeColumnStatistics mergedComment = columnStatisticsWithoutDataSize.get("comment").update(columnStatisticsWithDataSize.get("comment")); + assertEquals(mergedComment.getTotalSizeInBytes(), OptionalLong.empty()); + assertEquals(mergedComment.getNdvSummary().cardinality(), 5); + } } diff --git a/plugin/trino-delta-lake/src/test/resources/statistics/extended_stats_with_data_size/_delta_log/_trino_meta/extended_stats.json b/plugin/trino-delta-lake/src/test/resources/statistics/extended_stats_with_data_size/_delta_log/_trino_meta/extended_stats.json new file mode 100644 index 000000000000..6befc5e9fbde --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/statistics/extended_stats_with_data_size/_delta_log/_trino_meta/extended_stats.json @@ -0,0 +1 @@ +{"modelVersion":4,"alreadyAnalyzedModifiedTimeMax":"2022-06-14T00:22:45.832Z","columnStatistics":{"regionkey":{"ndvSummary":"AgwFAMAJpivCask0QBa4hwHLKZ9HPsfq"},"name":{"totalSizeInBytes":34,"ndvSummary":"AgwFAIDfcjhBaBVVQ5jHhcHx/PZAbXX+"},"comment":{"totalSizeInBytes":330,"ndvSummary":"AgwFAIBFHELBdPNTwUY4mQCYOLrBl1fx"}}} diff --git a/plugin/trino-delta-lake/src/test/resources/statistics/extended_stats_without_data_size/_delta_log/_trino_meta/extended_stats.json b/plugin/trino-delta-lake/src/test/resources/statistics/extended_stats_without_data_size/_delta_log/_trino_meta/extended_stats.json new file mode 100644 index 000000000000..b6d6b3836f2c --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/statistics/extended_stats_without_data_size/_delta_log/_trino_meta/extended_stats.json @@ -0,0 +1 @@ +{"modelVersion":4,"alreadyAnalyzedModifiedTimeMax":"2022-06-13T05:40:24.561Z","columnStatistics":{"regionkey":{"ndvSummary":"AgwFAMAJpivCask0QBa4hwHLKZ9HPsfq"},"name":{"ndvSummary":"AgwFAIDfcjhBaBVVQ5jHhcHx/PZAbXX+"},"comment":{"ndvSummary":"AgwFAIBFHELBdPNTwUY4mQCYOLrBl1fx"}}}