-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Register VACUUM operations in the delta log #24331
base: master
Are you sure you want to change the base?
Conversation
aa39226
to
2008d86
Compare
In the description "Solves" -> "Fixes". |
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Outdated
Show resolved
Hide resolved
return vacuumLoggingEnabled; | ||
} | ||
|
||
@Config("delta.vacuum.logging.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a specific configuration for this ?
I've landed on delta-io/delta@3ac120f
Maybe add this commit to description of the PR for reference.
BTW Trino does support concurrent writes on all the file systems. (Databricks has not out of the box support for AWS S3, while Trino does, which means there's a potential to cause eventually table corruption in a concurrent setting).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ebyhr : Pleae comment. I think the reasoning is that in Spark this is configurable and disabled by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggested adding this property for compatibility with Spark and also a kill-switch.
The current implementation still has possibility of conflicts. For instance, the logging may fail if there is a concurrent operation because we don't have a retry code in vacuum logic.
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Outdated
Show resolved
Hide resolved
return vacuumLoggingEnabled; | ||
} | ||
|
||
@Config("delta.vacuum.logging.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggested adding this property for compatibility with Spark and also a kill-switch.
The current implementation still has possibility of conflicts. For instance, the logging may fail if there is a concurrent operation because we don't have a retry code in vacuum logic.
2008d86
to
1ae966f
Compare
...rino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Outdated
Show resolved
Hide resolved
1ae966f
to
868ae1a
Compare
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Outdated
Show resolved
Hide resolved
722a27f
to
44324e9
Compare
c6fae4a
to
6c315fa
Compare
6c315fa
to
2b0f6d5
Compare
...rino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Show resolved
Hide resolved
...rino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java
Outdated
Show resolved
Hide resolved
2b0f6d5
to
0806741
Compare
There is funny failure. Why the sizeOfDataToDelete is different when running in CI vs running locally. Shouldn't it be expected to be deteriministic?
|
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Outdated
Show resolved
Hide resolved
...rino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java
Outdated
Show resolved
Hide resolved
...rino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java
Outdated
Show resolved
Hide resolved
0dee91e
to
31bc519
Compare
As discussed with @ebyhr offline, made use of
|
31bc519
to
f4cf893
Compare
Could you confirm CI failures? |
f4cf893
to
e65ea00
Compare
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java
Outdated
Show resolved
Hide resolved
ce141ba
to
e94c13a
Compare
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java
Outdated
Show resolved
Hide resolved
@@ -1746,6 +1750,34 @@ public void testVacuum() | |||
assertThat(getActiveFiles(tableName)).isEqualTo(updatedFiles); | |||
// old files should be cleaned up | |||
assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(updatedFiles); | |||
// operations should be logged | |||
assertThat(query("SELECT version, operation, MAP_FILTER(operation_parameters, (k, v) -> k <> 'queryId') FROM \"" + tableName + "$history\"")).matches( """ | |||
VALUES\s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove redundant \s
. Same for other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line still has redundant \s
.
plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java
Outdated
Show resolved
Hide resolved
...rino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java
Show resolved
Hide resolved
e94c13a
to
68c1e7c
Compare
...rino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java
Outdated
Show resolved
Hide resolved
68c1e7c
to
24955a0
Compare
...rino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java
Outdated
Show resolved
Hide resolved
24955a0
to
aad54e9
Compare
} | ||
|
||
@Test | ||
public void testVacuumWithParquetTransactionLogging() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parquet is used for both data files and checkpoints. Please rename to testVacuumTransactionLoggingOnCheckpoint
or something.
testVacuumWithCheckPointInterval(true); | ||
} | ||
|
||
private void testVacuumWithCheckPointInterval(boolean isCheckPointInterval) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put throws InterruptedException
and {
on new lines.
@@ -1746,6 +1750,34 @@ public void testVacuum() | |||
assertThat(getActiveFiles(tableName)).isEqualTo(updatedFiles); | |||
// old files should be cleaned up | |||
assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(updatedFiles); | |||
// operations should be logged | |||
assertThat(query("SELECT version, operation, MAP_FILTER(operation_parameters, (k, v) -> k <> 'queryId') FROM \"" + tableName + "$history\"")).matches( """ | |||
VALUES\s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line still has redundant \s
.
@@ -1746,6 +1761,34 @@ public void testVacuum() | |||
assertThat(getActiveFiles(tableName)).isEqualTo(updatedFiles); | |||
// old files should be cleaned up | |||
assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(updatedFiles); | |||
// operations should be logged | |||
assertThat(query("SELECT version, operation, MAP_FILTER(operation_parameters, (k, v) -> k <> 'queryId') FROM \"" + tableName + "$history\"")).matches( """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please follow a text block format of #23958.
Description
Fixes #24293
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(*) Release notes are required, with the following suggested text: