-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Collect Delta extended statistics when creating table #15878
Conversation
rebase on master to use CI fix #15879 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good overall. Couple questions/nitpicks
@@ -68,6 +68,7 @@ | |||
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS); | |||
private boolean tableStatisticsEnabled = true; | |||
private boolean extendedStatisticsEnabled = true; | |||
private boolean collectExtendedStatisticsColumnStatisticsOnWrite = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just collectExtendedColumnStatisticsOnWrite
?
Set<String> allColumnNames = extractColumnMetadata(metadata, typeManager).stream() | ||
.map(ColumnMetadata::getName) | ||
.collect(toImmutableSet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Save the result of extractColumnMetadata
so that you don't have to call it again at the bottom of this method.
Set<String> allColumnNames = extractColumnMetadata(metadata, typeManager).stream() | |
.map(ColumnMetadata::getName) | |
.collect(toImmutableSet()); | |
List<ColumnMetadata> columnMetadata = extractColumnMetadata(metadata, typeManager); | |
Set<String> allColumnNames = columnMetadata.stream() | |
.map(ColumnMetadata::getName) | |
.collect(toImmutableSet()); |
@@ -2124,31 +2135,61 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession | |||
handle.getReadVersion(), | |||
false); | |||
|
|||
TableStatisticsMetadata statisticsMetadata = getStatisticsCollectionMetadata( | |||
statistics, | |||
extractColumnMetadata(metadata, typeManager), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per other comment, don't have to call extractColumnMetadata
again.
Optional.empty(), | ||
tableMetadata.getColumns(), | ||
allColumnNames, | ||
false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not includeMaxFileModifiedTime
in this situation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Statistic aggregation during table creation does not have information about file_modified_time yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, right. Then if the modified time isn't present we just use the current time when the collection is done. Makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a code comment explaining this consideration?
What do we need to have this information available?
@@ -1361,7 +1361,7 @@ private void testDeltaLakeTableLocationChanged(boolean fewerEntries, boolean fir | |||
* testing in {@link TestDeltaLakeAnalyze}. | |||
*/ | |||
@Test | |||
public void testAnalyze() | |||
public void testStatisticsGenerationDuringTableCreation() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should still test the old thing too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed
@@ -147,22 +139,24 @@ private void testAnalyze(Optional<Integer> checkpointInterval) | |||
public void testAnalyzePartitioned() | |||
{ | |||
String tableName = "test_analyze_" + randomNameSuffix(); | |||
assertUpdate("CREATE TABLE " + tableName | |||
assertUpdate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do create a compatibility test with spark to verify that after a CTAS DESC EXTENDED
works as intended on Databricks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind. Trino Delta Lake (on the storage layer) & Databricks (on the metastore properties) have outputs in different places.
return collectExtendedStatisticsColumnStatisticsOnWrite; | ||
} | ||
|
||
@Config("delta.extended-statistics.collect-on-write") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do consider documenting this new property in delta-lake.rst
- either in this PR or a follow-up PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would wait with documentation until other write operations are implemented if that's ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -388,8 +382,7 @@ public void testAnalyzeSomeColumns() | |||
@Test | |||
public void testDropExtendedStats() | |||
{ | |||
try (TestTable table = new TestTable( | |||
getQueryRunner()::execute, | |||
try (TestTable table = new TestTable(getQueryRunner()::execute, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: There's no need to change this line. I would revert.
Reduce map iterations and lookups to minimum, while also simplifying the code flow.
mergedColumnStatistics.keySet(), | ||
analyzeHandle.getColumns().get())); | ||
} | ||
analyzeHandle.flatMap(AnalyzeHandle::getColumns).ifPresent(analyzeColumns -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this kind of cosmetic changes can be done in a separate commit.
@@ -2402,7 +2451,8 @@ private static Optional<Instant> getMaxFileModificationTime(Collection<ComputedS | |||
} | |||
return Optional.of(Instant.ofEpochMilli(unpackMillisUtc(TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS.getLong(entry.getValue(), 0)))); | |||
}) | |||
.collect(onlyElement()); | |||
.collect(toOptional()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
separate commit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change make sense only with this commit as it allows collection to have 0 elements. It should throw exception before.
@@ -147,22 +139,24 @@ private void testAnalyze(Optional<Integer> checkpointInterval) | |||
public void testAnalyzePartitioned() | |||
{ | |||
String tableName = "test_analyze_" + randomNameSuffix(); | |||
assertUpdate("CREATE TABLE " + tableName | |||
assertUpdate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind. Trino Delta Lake (on the storage layer) & Databricks (on the metastore properties) have outputs in different places.
Optional.empty(), | ||
tableMetadata.getColumns(), | ||
allColumnNames, | ||
false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a code comment explaining this consideration?
What do we need to have this information available?
Optional.empty(), | ||
tableMetadata.getColumns(), | ||
allColumnNames, | ||
false); // File modified time is not available during planning phase as table is not created yet. Time is added during statistics update. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Time is added during statistics update.
Do you mean Maximum File modified time
?
.max(Long::compare) | ||
.map(Instant::ofEpochMilli); | ||
|
||
updateTableStatistics(session, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updateTableStatistics(
session,
private void updateTableStatistics(ConnectorSession session, Optional<AnalyzeHandle> analyzeHandle, String location, Optional<Instant> maxFileModificationTime, | ||
Collection<ComputedStatistics> computedStatistics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line is now over line length limit, so --
we put all arguments on one line, or each on separate line
@@ -2410,7 +2458,8 @@ private static Optional<Instant> getMaxFileModificationTime(Collection<ComputedS | |||
} | |||
return Optional.of(Instant.ofEpochMilli(unpackMillisUtc(TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS.getLong(entry.getValue(), 0)))); | |||
}) | |||
.collect(onlyElement()); | |||
.collect(toOptional()) | |||
.flatMap(identity()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's minimal change, but that's not how you'd write the code if you were writing the code anew.
.flatMap(entry -> {
....
if (....) {
return Stream.of();
}
return Stream.of(Instant.ofEpochMilli(....));
})
.collect(toOptional());
Optional.empty(), | ||
tableMetadata.getColumns(), | ||
allColumnNames, | ||
false); // File modified time is not available during planning phase for writes. Maximum file modification time is obtained during statistics update. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like a problem and a workaround, but there isn't a problem
// File modified time does not need to be collected as a statistics because it gets derived directly from files being written
false);
@Test | ||
public void testStatisticsGenerationDuringTableCreation() | ||
{ | ||
String tableName = "test_analyze_" + randomNameSuffix(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_analyze_ -> test_ctats_stats_
@@ -1391,6 +1395,26 @@ public void testAnalyze() | |||
"(null, null, null, null, 25.0, null, null)"); | |||
} | |||
|
|||
@Test | |||
public void testStatisticsGenerationDuringTableCreation() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you paste this method contents into testCreateTableAsStatistics
above?
testCreateTableAsStatistics
has good name and a javadoc, just the contents are worse
"CREATE TABLE " + tableName + " " + | ||
(checkpointInterval.isPresent() ? format(" WITH (checkpoint_interval = %s)", checkpointInterval.get()) : "") + | ||
" AS SELECT * FROM tpch.sf1.nation", | ||
25); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unrelated fmt change
+ " WITH (" | ||
+ " partitioned_by = ARRAY['regionkey']" | ||
+ ")" | ||
+ "AS SELECT * FROM tpch.sf1.nation", 25); | ||
+ "AS SELECT * FROM tpch.sf1.nation", | ||
25); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unrelated fmt change
@@ -276,7 +270,7 @@ public void testAnalyzeWithFilesModifiedAfter() | |||
{ | |||
String tableName = "test_analyze_" + randomNameSuffix(); | |||
|
|||
assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.sf1.nation", 25); | |||
assertUpdate(disableStatisticsCollectionOnWrite(getSession()), "CREATE TABLE " + tableName + " AS SELECT * FROM tpch.sf1.nation", 25); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: each arg on separate line
"('name', null, null, 0.0, null, null, null)," + | ||
"(null, null, null, null, 25.0, null, null)"); | ||
|
||
runAnalyzeVerifySplitCount(tableName, 5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i know it's preexisting but i don't think we need to assert split count in every test method here. It blurs the test's intent
(perhaps, we don't need it in any test, i don't know, but i am not requesting any change to existing tests)
this would be better:
assertUpdate("ANALYZE " + tableName);
@pajaks @findinpath @alexjo2144 thank you, this is awesome! |
In particular this improves Delta query performance on data sets created in the connector using CTAS.
Description
Collect delta lake statistics for CREATE TABLE AS.
Additional context and related issues
Release notes
(x) Release notes are required, with the following suggested text: