diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/StatisticsAggregationPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/StatisticsAggregationPlanner.java index 22cf672d5081..6df4c8722fcf 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/StatisticsAggregationPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/StatisticsAggregationPlanner.java @@ -94,7 +94,7 @@ public TableStatisticAggregation createStatisticsAggregation(TableStatisticsMeta for (ColumnStatisticMetadata columnStatisticMetadata : statisticsMetadata.getColumnStatistics()) { String columnName = columnStatisticMetadata.getColumnName(); Symbol inputSymbol = columnToSymbolMap.get(columnName); - verifyNotNull(inputSymbol, "inputSymbol is null"); + verifyNotNull(inputSymbol, "no symbol for [%s] column, these columns exist: %s", columnName, columnToSymbolMap.keySet()); Type inputType = symbolAllocator.getTypes().get(inputSymbol); verifyNotNull(inputType, "inputType is null for symbol: %s", inputSymbol); ColumnStatisticsAggregation aggregation; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index 799b05b95025..352b18f6c684 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -68,6 +68,7 @@ public class DeltaLakeConfig private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS); private boolean tableStatisticsEnabled = true; private boolean extendedStatisticsEnabled = true; + private boolean collectExtendedStatisticsOnWrite = true; private HiveCompressionCodec compressionCodec = HiveCompressionCodec.SNAPPY; private long perTransactionMetastoreCacheMaximumSize = 1000; private boolean deleteSchemaLocationsFallback; @@ -346,6 +347,19 @@ public DeltaLakeConfig setExtendedStatisticsEnabled(boolean extendedStatisticsEn return this; } + public boolean isCollectExtendedStatisticsOnWrite() + { + return collectExtendedStatisticsOnWrite; + } + + @Config("delta.extended-statistics.collect-on-write") + @ConfigDescription("Enables automatic column level extended statistics collection on write") + public DeltaLakeConfig setCollectExtendedStatisticsOnWrite(boolean collectExtendedStatisticsOnWrite) + { + this.collectExtendedStatisticsOnWrite = collectExtendedStatisticsOnWrite; + return this; + } + @NotNull public HiveCompressionCodec getCompressionCodec() { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 60d8058227ae..fb27700b6641 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -138,6 +138,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; @@ -152,7 +153,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; -import static com.google.common.collect.MoreCollectors.onlyElement; +import static com.google.common.collect.MoreCollectors.toOptional; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.FILE_MODIFIED_TIME_COLUMN_NAME; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.MERGE_ROW_ID_TYPE; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.ROW_ID_COLUMN_NAME; @@ -164,6 +165,7 @@ import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; +import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isCollectExtendedStatisticsColumnStatisticsOnWrite; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isLegacyCreateTableWithExistingLocationEnabled; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isTableStatisticsEnabled; @@ -994,6 +996,21 @@ public Optional finishCreateTable( DEFAULT_PROTOCOL); appendAddFileEntries(transactionLogWriter, dataFileInfos, handle.getPartitionedBy(), true); transactionLogWriter.flush(); + + if (isCollectExtendedStatisticsColumnStatisticsOnWrite(session) && !computedStatistics.isEmpty()) { + Optional maxFileModificationTime = dataFileInfos.stream() + .map(DataFileInfo::getCreationTime) + .max(Long::compare) + .map(Instant::ofEpochMilli); + + updateTableStatistics( + session, + Optional.empty(), + location, + maxFileModificationTime, + computedStatistics); + } + PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner().orElseThrow()); try { @@ -1100,7 +1117,7 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl ImmutableMap.Builder columnComments = ImmutableMap.builder(); columnComments.putAll(getColumnComments(deltaLakeTableHandle.getMetadataEntry()).entrySet().stream() .filter(e -> !e.getKey().equals(deltaLakeColumnHandle.getName())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + .collect(Collectors.toMap(Entry::getKey, Entry::getValue))); comment.ifPresent(s -> columnComments.put(deltaLakeColumnHandle.getName(), s)); TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, deltaLakeTableHandle.getLocation()); @@ -1972,7 +1989,7 @@ public Optional> applyFilter(C ImmutableMap.Builder enforceableDomains = ImmutableMap.builder(); ImmutableMap.Builder unenforceableDomains = ImmutableMap.builder(); - for (Map.Entry domainEntry : constraintDomains.entrySet()) { + for (Entry domainEntry : constraintDomains.entrySet()) { DeltaLakeColumnHandle column = (DeltaLakeColumnHandle) domainEntry.getKey(); if (!partitionColumns.contains(column)) { unenforceableDomains.put(column, domainEntry.getValue()); @@ -2081,6 +2098,10 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession alreadyAnalyzedModifiedTimeMax.orElse(EPOCH))); } + List columnsMetadata = extractColumnMetadata(metadata, typeManager); + Set allColumnNames = columnsMetadata.stream() + .map(ColumnMetadata::getName) + .collect(toImmutableSet()); Optional> analyzeColumnNames = DeltaLakeAnalyzeProperties.getColumnNames(analyzeProperties); if (analyzeColumnNames.isPresent()) { Set columnNames = analyzeColumnNames.get(); @@ -2089,9 +2110,6 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession throw new TrinoException(INVALID_ANALYZE_PROPERTY, "Cannot specify empty list of columns for analysis"); } - Set allColumnNames = extractColumnMetadata(metadata, typeManager).stream() - .map(ColumnMetadata::getName) - .collect(toImmutableSet()); if (!allColumnNames.containsAll(columnNames)) { throw new TrinoException( INVALID_ANALYZE_PROPERTY, @@ -2124,31 +2142,62 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession handle.getReadVersion(), false); + TableStatisticsMetadata statisticsMetadata = getStatisticsCollectionMetadata( + statistics, + columnsMetadata, + analyzeColumnNames.orElse(allColumnNames), + true); + + return new ConnectorAnalyzeMetadata(newHandle, statisticsMetadata); + } + + @Override + public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + if (!isCollectExtendedStatisticsColumnStatisticsOnWrite(session)) { + return TableStatisticsMetadata.empty(); + } + + Set allColumnNames = tableMetadata.getColumns().stream() + .map(ColumnMetadata::getName) + .collect(toImmutableSet()); + + return getStatisticsCollectionMetadata( + Optional.empty(), + tableMetadata.getColumns(), + allColumnNames, + // File modified time does not need to be collected as a statistics because it gets derived directly from files being written + false); + } + + private TableStatisticsMetadata getStatisticsCollectionMetadata( + Optional existingStatistics, + List tableColumns, + Set analyzeColumnNames, + boolean includeMaxFileModifiedTime) + { ImmutableSet.Builder columnStatistics = ImmutableSet.builder(); - extractColumnMetadata(metadata, typeManager).stream() + tableColumns.stream() .filter(DeltaLakeMetadata::shouldCollectExtendedStatistics) - .filter(columnMetadata -> - analyzeColumnNames - .map(columnNames -> columnNames.contains(columnMetadata.getName())) - .orElse(true)) + .filter(columnMetadata -> analyzeColumnNames.contains(columnMetadata.getName())) .forEach(columnMetadata -> { if (!(columnMetadata.getType() instanceof FixedWidthType)) { - if (statistics.isEmpty() || totalSizeStatisticsExists(statistics.get().getColumnStatistics(), columnMetadata.getName())) { + if (existingStatistics.isEmpty() || totalSizeStatisticsExists(existingStatistics.get().getColumnStatistics(), columnMetadata.getName())) { columnStatistics.add(new ColumnStatisticMetadata(columnMetadata.getName(), TOTAL_SIZE_IN_BYTES)); } } columnStatistics.add(new ColumnStatisticMetadata(columnMetadata.getName(), NUMBER_OF_DISTINCT_VALUES_SUMMARY)); }); - // collect max(file modification time) for sake of incremental ANALYZE - columnStatistics.add(new ColumnStatisticMetadata(FILE_MODIFIED_TIME_COLUMN_NAME, MAX_VALUE)); + if (includeMaxFileModifiedTime) { + // collect max(file modification time) for sake of incremental ANALYZE + columnStatistics.add(new ColumnStatisticMetadata(FILE_MODIFIED_TIME_COLUMN_NAME, MAX_VALUE)); + } - TableStatisticsMetadata statisticsMetadata = new TableStatisticsMetadata( + return new TableStatisticsMetadata( columnStatistics.build(), ImmutableSet.of(), ImmutableList.of()); - - return new ConnectorAnalyzeMetadata(newHandle, statisticsMetadata); } private static boolean shouldCollectExtendedStatistics(ColumnMetadata columnMetadata) @@ -2181,6 +2230,22 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) table; AnalyzeHandle analyzeHandle = tableHandle.getAnalyzeHandle().orElseThrow(() -> new IllegalArgumentException("analyzeHandle not set")); String location = metastore.getTableLocation(tableHandle.getSchemaTableName(), session); + Optional maxFileModificationTime = getMaxFileModificationTime(computedStatistics); + updateTableStatistics( + session, + Optional.of(analyzeHandle), + location, + maxFileModificationTime, + computedStatistics); + } + + private void updateTableStatistics( + ConnectorSession session, + Optional analyzeHandle, + String location, + Optional maxFileModificationTime, + Collection computedStatistics) + { Optional oldStatistics = statisticsAccess.readExtendedStatistics(session, location); // more elaborate logic for handling statistics model evaluation may need to be introduced in the future @@ -2194,19 +2259,18 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH .orElseGet(ImmutableMap::of); Map newColumnStatistics = toDeltaLakeColumnStatistics(computedStatistics); - Map mergedColumnStatistics = new HashMap<>(); - - // only keep stats for existing columns - Set newColumns = newColumnStatistics.keySet(); - oldColumnStatistics.entrySet().stream() - .filter(entry -> newColumns.contains(entry.getKey())) - .forEach(entry -> mergedColumnStatistics.put(entry.getKey(), entry.getValue())); - - newColumnStatistics.forEach((columnName, columnStatistics) -> { - mergedColumnStatistics.merge(columnName, columnStatistics, DeltaLakeColumnStatistics::update); - }); + Map mergedColumnStatistics = newColumnStatistics.entrySet().stream() + .collect(toImmutableMap( + Entry::getKey, + entry -> { + String columnName = entry.getKey(); + DeltaLakeColumnStatistics newStats = entry.getValue(); + DeltaLakeColumnStatistics oldStats = oldColumnStatistics.get(columnName); + return oldStats == null + ? newStats + : oldStats.update(newStats); + })); - Optional maxFileModificationTime = getMaxFileModificationTime(computedStatistics); // We do not want to hinder our future calls to ANALYZE if one of the files we analyzed have modification time far in the future. // Therefore we cap the value we store in extended_stats.json to current_time as observed on Trino coordinator. Instant finalAlreadyAnalyzedModifiedTimeMax = Instant.now(); @@ -2218,18 +2282,17 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH finalAlreadyAnalyzedModifiedTimeMax = Comparators.max(oldStatistics.get().getAlreadyAnalyzedModifiedTimeMax(), finalAlreadyAnalyzedModifiedTimeMax); } - if (analyzeHandle.getColumns().isPresent() && !mergedColumnStatistics.keySet().equals(analyzeHandle.getColumns().get())) { - // sanity validation - throw new IllegalStateException( - format("Unexpected columns in in mergedColumnStatistics %s; expected %s", - mergedColumnStatistics.keySet(), - analyzeHandle.getColumns().get())); - } + analyzeHandle.flatMap(AnalyzeHandle::getColumns).ifPresent(analyzeColumns -> { + if (!mergedColumnStatistics.keySet().equals(analyzeColumns)) { + // sanity validation + throw new IllegalStateException(format("Unexpected columns in in mergedColumnStatistics %s; expected %s", mergedColumnStatistics.keySet(), analyzeColumns)); + } + }); ExtendedStatistics mergedExtendedStatistics = new ExtendedStatistics( finalAlreadyAnalyzedModifiedTimeMax, mergedColumnStatistics, - analyzeHandle.getColumns()); + analyzeHandle.flatMap(AnalyzeHandle::getColumns)); statisticsAccess.updateExtendedStatistics(session, location, mergedExtendedStatistics); } @@ -2338,7 +2401,7 @@ private static Map toDeltaLakeColumnStatistic // Only statistics for whole table are collected ComputedStatistics singleStatistics = Iterables.getOnlyElement(computedStatistics); return createColumnToComputedStatisticsMap(singleStatistics.getColumnStatistics()).entrySet().stream() - .collect(toImmutableMap(Map.Entry::getKey, entry -> createDeltaLakeColumnStatistics(entry.getValue()))); + .collect(toImmutableMap(Entry::getKey, entry -> createDeltaLakeColumnStatistics(entry.getValue()))); } private static Map> createColumnToComputedStatisticsMap(Map computedStatistics) @@ -2391,17 +2454,17 @@ private static Optional getMaxFileModificationTime(Collection entry.getKey().getColumnName().equals(FILE_MODIFIED_TIME_COLUMN_NAME)) - .map(entry -> { + .flatMap(entry -> { ColumnStatisticMetadata columnStatisticMetadata = entry.getKey(); if (columnStatisticMetadata.getStatisticType() != MAX_VALUE) { throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unexpected statistics collection: " + columnStatisticMetadata); } if (entry.getValue().isNull(0)) { - return Optional.empty(); + return Stream.of(); } - return Optional.of(Instant.ofEpochMilli(unpackMillisUtc(TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS.getLong(entry.getValue(), 0)))); + return Stream.of(Instant.ofEpochMilli(unpackMillisUtc(TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS.getLong(entry.getValue(), 0)))); }) - .collect(onlyElement()); + .collect(toOptional()); } public DeltaLakeMetastore getMetastore() diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java index d0a986b157c7..cd3955db6ac2 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java @@ -61,6 +61,7 @@ public final class DeltaLakeSessionProperties private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout"; private static final String TABLE_STATISTICS_ENABLED = "statistics_enabled"; public static final String EXTENDED_STATISTICS_ENABLED = "extended_statistics_enabled"; + public static final String EXTENDED_STATISTICS_COLLECT_ON_WRITE = "extended_statistics_collect_on_write"; public static final String LEGACY_CREATE_TABLE_WITH_EXISTING_LOCATION_ENABLED = "legacy_create_table_with_existing_location_enabled"; private final List> sessionProperties; @@ -163,6 +164,11 @@ public DeltaLakeSessionProperties( "Enable using the CREATE TABLE statement to register an existing table", deltaLakeConfig.isLegacyCreateTableWithExistingLocationEnabled(), false), + booleanProperty( + EXTENDED_STATISTICS_COLLECT_ON_WRITE, + "Enables automatic column level extended statistics collection on write", + deltaLakeConfig.isCollectExtendedStatisticsOnWrite(), + false), enumProperty( COMPRESSION_CODEC, "Compression codec to use when writing new data files", @@ -258,6 +264,11 @@ public static boolean isLegacyCreateTableWithExistingLocationEnabled(ConnectorSe return session.getProperty(LEGACY_CREATE_TABLE_WITH_EXISTING_LOCATION_ENABLED, Boolean.class); } + public static boolean isCollectExtendedStatisticsColumnStatisticsOnWrite(ConnectorSession session) + { + return session.getProperty(EXTENDED_STATISTICS_COLLECT_ON_WRITE, Boolean.class); + } + public static HiveCompressionCodec getCompressionCodec(ConnectorSession session) { return session.getProperty(COMPRESSION_CODEC, HiveCompressionCodec.class); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index a948114485e7..902574350fac 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -59,6 +59,7 @@ import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING; import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG; +import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.EXTENDED_STATISTICS_COLLECT_ON_WRITE; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.DELETE_TABLE; @@ -513,26 +514,21 @@ tableName, getLocationForTable(bucketName, tableName)), @Test public void testCreateTableAsStatistics() { + String tableName = "test_ctats_stats_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + + " WITH (" + + "location = '" + getLocationForTable(bucketName, tableName) + "'" + + ")" + + " AS SELECT * FROM tpch.sf1.nation", 25); + assertQuery( - "SHOW STATS FOR lineitem", + "SHOW STATS FOR " + tableName, "VALUES " + - "('orderkey', NULL, NULL, 0.0, NULL, '1', '60000')," + - "('partkey', NULL, NULL, 0.0, NULL, '1', '2000')," + - "('suppkey', NULL, NULL, 0.0, NULL, '1', '100')," + - "('linenumber', NULL, NULL, 0.0, NULL, '1', '7')," + - "('quantity', NULL, NULL, 0.0, NULL, '1.0', '50.0')," + - "('extendedprice', NULL, NULL, 0.0, NULL, '904.0', '94949.5')," + - "('discount', NULL, NULL, 0.0, NULL, '0.0', '0.1')," + - "('tax', NULL, NULL, 0.0, NULL, '0.0', '0.08')," + - "('returnflag', NULL, NULL, 0.0, NULL, NULL, NULL)," + - "('linestatus', NULL, NULL, 0.0, NULL, NULL, NULL)," + - "('shipdate', NULL, NULL, 0.0, NULL, '1992-01-04', '1998-11-29')," + - "('commitdate', NULL, NULL, 0.0, NULL, '1992-02-02', '1998-10-28')," + - "('receiptdate', NULL, NULL, 0.0, NULL, '1992-01-09', '1998-12-25')," + - "('shipinstruct', NULL, NULL, 0.0, NULL, NULL, NULL)," + - "('shipmode', NULL, NULL, 0.0, NULL, NULL, NULL)," + - "('comment', NULL, NULL, 0.0, NULL, NULL, NULL)," + - "(NULL, NULL, NULL, NULL, 60175.0, NULL, NULL)"); + "('nationkey', null, 25.0, 0.0, null, 0, 24)," + + "('regionkey', null, 5.0, 0.0, null, 0, 4)," + + "('comment', 1857.0, 25.0, 0.0, null, null, null)," + + "('name', 177.0, 25.0, 0.0, null, null, null)," + + "(null, null, null, null, 25.0, null, null)"); } @Test @@ -1364,7 +1360,10 @@ private void testDeltaLakeTableLocationChanged(boolean fewerEntries, boolean fir public void testAnalyze() { String tableName = "test_analyze_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + Session sessionWithDisabledStatisticsOnWrite = Session.builder(getSession()) + .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), EXTENDED_STATISTICS_COLLECT_ON_WRITE, "false") + .build(); + assertUpdate(sessionWithDisabledStatisticsOnWrite, "CREATE TABLE " + tableName + " WITH (" + "location = '" + getLocationForTable(bucketName, tableName) + "'" + ")" diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java index 563beb95d77d..462d32ab0757 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java @@ -31,6 +31,7 @@ import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG; import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.TPCH_SCHEMA; import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.createDeltaLakeQueryRunner; +import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.EXTENDED_STATISTICS_COLLECT_ON_WRITE; import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.INSERT_TABLE; import static io.trino.testing.TestingAccessControlManager.privilege; import static io.trino.testing.TestingNames.randomNameSuffix; @@ -71,17 +72,6 @@ private void testAnalyze(Optional checkpointInterval) + (checkpointInterval.isPresent() ? format(" WITH (checkpoint_interval = %s)", checkpointInterval.get()) : "") + " AS SELECT * FROM tpch.sf1.nation", 25); - assertQuery( - "SHOW STATS FOR " + tableName, - "VALUES " + - "('nationkey', null, null, 0.0, null, 0, 24)," + - "('regionkey', null, null, 0.0, null, 0, 4)," + - "('comment', null, null, 0.0, null, null, null)," + - "('name', null, null, 0.0, null, null, null)," + - "(null, null, null, null, 25.0, null, null)"); - - runAnalyzeVerifySplitCount(tableName, 1); - assertQuery( "SHOW STATS FOR " + tableName, "VALUES " + @@ -156,13 +146,13 @@ public void testAnalyzePartitioned() assertQuery( "SHOW STATS FOR " + tableName, "VALUES " + - "('nationkey', null, null, 0.0, null, 0, 24)," + + "('nationkey', null, 25.0, 0.0, null, 0, 24)," + "('regionkey', null, 5.0, 0.0, null, null, null)," + - "('comment', null, null, 0.0, null, null, null)," + - "('name', null, null, 0.0, null, null, null)," + + "('comment', 1857.0, 25.0, 0.0, null, null, null)," + + "('name', 177.0, 25.0, 0.0, null, null, null)," + "(null, null, null, null, 25.0, null, null)"); - runAnalyzeVerifySplitCount(tableName, 5); + runAnalyzeVerifySplitCount(tableName, 1); assertQuery( "SHOW STATS FOR " + tableName, @@ -276,7 +266,10 @@ public void testAnalyzeWithFilesModifiedAfter() { String tableName = "test_analyze_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.sf1.nation", 25); + assertUpdate( + disableStatisticsCollectionOnWrite(getSession()), + "CREATE TABLE " + tableName + " AS SELECT * FROM tpch.sf1.nation", + 25); Thread.sleep(100); Instant afterInitialDataIngestion = Instant.now(); @@ -407,10 +400,6 @@ public void testDropExtendedStats() + "('name', 177.0, 25.0, 0.0, null, null, null)," + "(null, null, null, null, 25.0, null, null)"; - assertQuery(query, baseStats); - - // Update stats to include distinct count - runAnalyzeVerifySplitCount(table.getName(), 1); assertQuery(query, extendedStats); // Dropping extended stats clears distinct count and leaves other stats alone @@ -479,6 +468,70 @@ public void testStatsOnTpcDsData() } } + @Test + public void testCreateTableStatisticsWhenCollectionOnWriteDisabled() + { + String tableName = "test_statistics_" + randomNameSuffix(); + assertUpdate( + disableStatisticsCollectionOnWrite(getSession()), + "CREATE TABLE " + tableName + " AS SELECT * FROM tpch.sf1.nation", + 25); + + assertQuery( + "SHOW STATS FOR " + tableName, + "VALUES " + + "('nationkey', null, null, 0.0, null, 0, 24)," + + "('regionkey', null, null, 0.0, null, 0, 4)," + + "('comment', null, null, 0.0, null, null, null)," + + "('name', null, null, 0.0, null, null, null)," + + "(null, null, null, null, 25.0, null, null)"); + + assertUpdate("ANALYZE " + tableName); + + assertQuery( + "SHOW STATS FOR " + tableName, + "VALUES " + + "('nationkey', null, 25.0, 0.0, null, 0, 24)," + + "('regionkey', null, 5.0, 0.0, null, 0, 4)," + + "('comment', 1857.0, 25.0, 0.0, null, null, null)," + + "('name', 177.0, 25.0, 0.0, null, null, null)," + + "(null, null, null, null, 25.0, null, null)"); + } + + @Test + public void testCreatePartitionedTableStatisticsWhenCollectionOnWriteDisabled() + { + String tableName = "test_statistics_" + randomNameSuffix(); + assertUpdate( + disableStatisticsCollectionOnWrite(getSession()), + "CREATE TABLE " + tableName + + " WITH (" + + " partitioned_by = ARRAY['regionkey']" + + ")" + + "AS SELECT * FROM tpch.sf1.nation", + 25); + + assertQuery( + "SHOW STATS FOR " + tableName, + "VALUES " + + "('nationkey', null, null, 0.0, null, 0, 24)," + + "('regionkey', null, 5.0, 0.0, null, null, null)," + + "('comment', null, null, 0.0, null, null, null)," + + "('name', null, null, 0.0, null, null, null)," + + "(null, null, null, null, 25.0, null, null)"); + + assertUpdate("ANALYZE " + tableName); + + assertQuery( + "SHOW STATS FOR " + tableName, + "VALUES " + + "('nationkey', null, 25.0, 0.0, null, 0, 24)," + + "('regionkey', null, 5.0, 0.0, null, null, null)," + + "('comment', 1857.0, 25.0, 0.0, null, null, null)," + + "('name', 177.0, 25.0, 0.0, null, null, null)," + + "(null, null, null, null, 25.0, null, null)"); + } + private void runAnalyzeVerifySplitCount(String tableName, long expectedSplitCount) { MaterializedResultWithQueryId analyzeResult = getDistributedQueryRunner().executeWithQueryId(getSession(), "ANALYZE " + tableName); @@ -502,4 +555,11 @@ private OperatorStats getOperatorStats(QueryId queryId) .filter(summary -> summary.getOperatorType().contains("Scan")) .collect(onlyElement()); } + + private static Session disableStatisticsCollectionOnWrite(Session session) + { + return Session.builder(session) + .setCatalogSessionProperty(session.getCatalog().orElseThrow(), EXTENDED_STATISTICS_COLLECT_ON_WRITE, "false") + .build(); + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index 3108c8bd97d1..93ee7fade662 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -59,6 +59,7 @@ public void testDefaults() .setDynamicFilteringWaitTimeout(new Duration(0, SECONDS)) .setTableStatisticsEnabled(true) .setExtendedStatisticsEnabled(true) + .setCollectExtendedStatisticsOnWrite(true) .setCompressionCodec(HiveCompressionCodec.SNAPPY) .setDeleteSchemaLocationsFallback(false) .setParquetTimeZone(TimeZone.getDefault().getID()) @@ -93,6 +94,7 @@ public void testExplicitPropertyMappings() .put("delta.dynamic-filtering.wait-timeout", "30m") .put("delta.table-statistics-enabled", "false") .put("delta.extended-statistics.enabled", "false") + .put("delta.extended-statistics.collect-on-write", "false") .put("delta.compression-codec", "GZIP") .put("delta.per-transaction-metastore-cache-maximum-size", "500") .put("delta.delete-schema-locations-fallback", "true") @@ -124,6 +126,7 @@ public void testExplicitPropertyMappings() .setDynamicFilteringWaitTimeout(new Duration(30, MINUTES)) .setTableStatisticsEnabled(false) .setExtendedStatisticsEnabled(false) + .setCollectExtendedStatisticsOnWrite(false) .setCompressionCodec(HiveCompressionCodec.GZIP) .setDeleteSchemaLocationsFallback(true) .setParquetTimeZone(nonDefaultTimeZone().getID()) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDelete.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDelete.java index 12a2a3bc4f5f..c278f84ea1e2 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDelete.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDelete.java @@ -216,16 +216,16 @@ public void testStatsAfterDelete() 4); assertQuery("SHOW STATS FOR " + tableName, "VALUES " + - "('a', null, null, 0.5, null, 1, 7)," + - "('b', null, null, 0.5, null, 3, 9)," + - "('c', null, null, 0.75, null, 5, 5)," + + "('a', null, 2.0, 0.5, null, 1, 7)," + + "('b', null, 2.0, 0.5, null, 3, 9)," + + "('c', null, 1.0, 0.75, null, 5, 5)," + "(null, null, null, null, 4.0, null, null)"); assertUpdate("DELETE FROM " + tableName + " WHERE c IS NULL", 3); assertQuery("SHOW STATS FOR " + tableName, "VALUES " + - "('a', null, null, 0.0, null, 1, 1)," + - "('b', null, null, 0.0, null, 3, 3)," + - "('c', null, null, 0.0, null, 5, 5)," + + "('a', null, 1.0, 0.0, null, 1, 1)," + + "('b', null, 1.0, 0.0, null, 3, 3)," + + "('c', null, 1.0, 0.0, null, 5, 5)," + "(null, null, null, null, 1.0, null, null)"); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableStatistics.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableStatistics.java index 70aed3235a9c..bd3e1aa547fd 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableStatistics.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableStatistics.java @@ -74,7 +74,7 @@ public void testShowStatsForTableWithNullsInPartitioningColumn() "VALUES " + // column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value "('pk', null, 1.0, 0.5, null, null, null)," + - "('val_col', null, null, 0.0, null, 23, 24)," + + "('val_col', null, 2.0, 0.0, null, 23, 24)," + "(null, null, null, null, 2.0, null, null)"); } @@ -96,7 +96,7 @@ public void testShowStatsForTableWithTwoPartitioningColumns() // column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value "('pk1', null, 1.0, 0.25, null, null, null)," + "('pk2', null, 3.0, 0.0, null, null, null)," + - "('val_col', null, null, 0.0, null, 23, 26)," + + "('val_col', null, 4.0, 0.0, null, 23, 26)," + "(null, null, null, null, 4.0, null, null)"); } @@ -114,7 +114,7 @@ public void testShowStatsForPartitioningColumnThatOnlyHasNulls() "VALUES " + // column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value "('pk1', 0.0, 0.0, 1.0, null, null, null)," + - "('val_col', null, null, 0.0, null, 23, 24)," + + "('val_col', null, 2.0, 0.0, null, 23, 24)," + "(null, null, null, null, 2.0, null, null)"); } @@ -136,7 +136,7 @@ public void testShowStatsForQueryWithWhereClause() // column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value "('pk1', null, 1.0, 0.0, null, null, null)," + "('pk2', null, 3.0, 0.0, null, null, null)," + - "('val_col', null, null, 0.0, null, 23, 26)," + + "('val_col', null, 3.0, 0.0, null, 23, 26)," + "(null, null, null, null, 3.0, null, null)"); } @@ -148,7 +148,7 @@ public void testShowStatsForAllNullColumn() "SHOW STATS FOR show_stats_with_null", "VALUES " + // column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value - "('col', 0.0, null, 1.0, null, null, null)," + + "('col', 0.0, 0.0, 1.0, null, null, null)," + "(null, null, null, null, 1.0, null, null)"); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 84c931a49f4b..0d3c694e469f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -3231,7 +3231,7 @@ public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Connector return TableStatisticsMetadata.empty(); } if (isTransactional(tableMetadata.getProperties()).orElse(false)) { - // TODO(https://github.com/trinodb/trino/issues/1956) updating table statistics for trasactional not supported right now. + // TODO(https://github.com/trinodb/trino/issues/1956) updating table statistics for transactional not supported right now. return TableStatisticsMetadata.empty(); } List partitionedBy = firstNonNull(getPartitionedBy(tableMetadata.getProperties()), ImmutableList.of());