-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve Iceberg deletes when an entire file can be removed #12197
Conversation
065dbc7
to
ef0b7ae
Compare
Hmm so that doesn't work, but we have a few options:
|
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
ef0b7ae
to
b7bc6c0
Compare
@findinpath @findepi I pushed a new approach here, accumulating the number of deleted rows during the writing of Position Delete files, and comparing that to the file's record count. PTAL Piotr, we had talked offline about using a mechanism similar to |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
catch (IOException e) { | ||
log.warn(e, "Failed to clean up uncommitted position delete files"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not propagate here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably because we create delete files for this few lines below?
Tough I'd expect propagation too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failing to delete a file that is not going to be committed didn't seem like enough of a problem to warrant failing the query.
If we have limited fs permissions and can't delete files, for example, we would still be able to write deletes.
This would eventually get picked up by a remove_orphan_files
collection
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java
Outdated
Show resolved
Hide resolved
...rino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/IcebergPositionDeletePageSink.java
Outdated
Show resolved
Hide resolved
...o-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java
Outdated
Show resolved
Hide resolved
assertThat(query("SELECT * FROM " + tableName)).returnsEmptyResult(); | ||
assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(1); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@findinpath @homar please review this class
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
Show resolved
Hide resolved
Correct me if I'm wrong but here we do create position delete file pretty much the standard way but then instead of commiting it we delete entire data file if delete file contains all the rows from it(based on the row count) right ? I know preparing benchmark env is in progress but do we plan to benchmark it without this change? I am just curious how big of an impact it has. |
b7bc6c0
to
4c9f852
Compare
@homar right, the delete itself is not any faster but read time is improved. Doing it this way means that the next read does not need to do any I/O for the deleted file vs the old way which would read the entire data-file AND the entire position-delete-file. I don't have any benchmarks for this but I would be very surprised if it wasn't an improvement. |
AC Thanks for the reviews |
0891f20
to
3b47b18
Compare
I would also be very surprised. I am just wondering how much of an impact it could have :) |
@alexjo2144 please rebase, there is a conflict. I will re-review after that |
3b47b18
to
98bdfd4
Compare
Rebased, thanks |
98bdfd4
to
2981709
Compare
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Show resolved
Hide resolved
assertUpdate( | ||
Session.builder(getSession()).setCatalogSessionProperty("iceberg", "orc_writer_max_stripe_rows", "5").build(), | ||
"CREATE TABLE " + tableName + " WITH (format = 'ORC') AS SELECT * FROM tpch.tiny.nation", 25); | ||
this.loadTable(tableName).updateProperties().set(SPLIT_SIZE, "100").commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what for?
is 100 ok numher? we have 25 rows only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's 100 bytes, not rows. I'll add a comment but this ensures each ORC stripe gets a Split by itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private long getQuerySplits(QueryId queryId)
{
QueryStats stats = getDistributedQueryRunner().getCoordinator().getQueryManager().getFullQueryInfo(queryId).getQueryStats();
long numberOfSplits = stats.getOperatorSummaries()
.stream()
.filter(summary -> summary.getOperatorType().equals("ScanFilterAndProjectOperator"))
.mapToLong(OperatorStats::getTotalDrivers)
.sum();
return numberOfSplits;
}
ResultWithQueryId<MaterializedResult> deletionResult = getDistributedQueryRunner().executeWithQueryId(getSession(), "DELETE FROM " + tableName + " WHERE regionkey < 10");
long deletionSplits = getQuerySplits(deletionResult.getQueryId());
I was hoping to see in the query stats that there are multiple splits for the file, but this wasn't the case.
I checked via debug and indeed there are actually ~ 62 splits of maximum 100B
.
Any idea how we could retrieve the number of splits in the test case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some Delta tests that check the number of splits, but they are a bit finicky / often flaky so I didn't include one here.
e767c6c
to
b0a7e4c
Compare
AC and rebased for conflicts, thanks @findepi |
b0a7e4c
to
815f2bf
Compare
Impressive work 👍 |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java
Outdated
Show resolved
Hide resolved
if (!table.getEnforcedPredicate().isAll()) { | ||
rowDelta.conflictDetectionFilter(toIcebergExpression(table.getEnforcedPredicate())); | ||
} | ||
Map<String, List<CommitTaskData>> deletesByFilePath = commitTasks.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finishWrite
method is now ~ 150 lines long.
Please consider a refactoring of the method to smaller building blocks in order to ensure a good readability in the weeks/months to come.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
follow-up
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
Show resolved
Hide resolved
assertUpdate( | ||
Session.builder(getSession()).setCatalogSessionProperty("iceberg", "orc_writer_max_stripe_rows", "5").build(), | ||
"CREATE TABLE " + tableName + " WITH (format = 'ORC') AS SELECT * FROM tpch.tiny.nation", 25); | ||
this.loadTable(tableName).updateProperties().set(SPLIT_SIZE, "100").commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private long getQuerySplits(QueryId queryId)
{
QueryStats stats = getDistributedQueryRunner().getCoordinator().getQueryManager().getFullQueryInfo(queryId).getQueryStats();
long numberOfSplits = stats.getOperatorSummaries()
.stream()
.filter(summary -> summary.getOperatorType().equals("ScanFilterAndProjectOperator"))
.mapToLong(OperatorStats::getTotalDrivers)
.sum();
return numberOfSplits;
}
ResultWithQueryId<MaterializedResult> deletionResult = getDistributedQueryRunner().executeWithQueryId(getSession(), "DELETE FROM " + tableName + " WHERE regionkey < 10");
long deletionSplits = getQuerySplits(deletionResult.getQueryId());
I was hoping to see in the query stats that there are multiple splits for the file, but this wasn't the case.
I checked via debug and indeed there are actually ~ 62 splits of maximum 100B
.
Any idea how we could retrieve the number of splits in the test case?
If a delete would remove all rows from an individual file, remove the whole file, rather than writing a position delete. This does not include situations where a whole file is deleted across multiple row-level passes. All rows must be deleted by one delete operation.
815f2bf
to
a8cc9be
Compare
Added some partitioned table tests. Thanks for the suggestion @findinpath |
Description
If a delete would remove all rows from an individual file,
remove the whole file, rather than writing a position delete.
This does not include situations where a whole file is deleted
across multiple row-level passes. All rows must be deleted by
one delete operation.
Improvement
Iceberg connector
Improves performance when deleting all rows in a file.
Related issues, pull requests, and links
Documentation
(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
( ) No release notes entries required.
(x) Release notes entries required with the following suggested text: