-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add concurrent writes reconciliation for DELETE
pushdown/TRUNCATE
in Delta Lake
#18521
Add concurrent writes reconciliation for DELETE
pushdown/TRUNCATE
in Delta Lake
#18521
Conversation
258382d
to
b262cb7
Compare
b262cb7
to
b05405f
Compare
b05405f
to
01347f4
Compare
UPDATE
/MERGE
/DELETE
in Delta LakeUPDATE
/MERGE
/DELETE
in Delta Lake
2bbb70e
to
6b94cfe
Compare
6b94cfe
to
39f1647
Compare
39f1647
to
74ab9dc
Compare
|
||
return metadata.executeDelete(connectorSession, table.getConnectorHandle()); | ||
List<ConnectorTableHandle> sourceConnectorHandles = sourceTableHandles.stream() | ||
.filter(handle -> handle.getCatalogHandle().equals(catalogHandle)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why we wan't only handles from the same catalog?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is intentional in order to ensure that schema and table name are pointing to the very same table.
If we'd have catalog1.schema1.table1
and catalog2.schema1.table1
we wouldn't be able to tell whether these tables are one and the same (because they may point to completely different tables even though their schema and table name are the same).
@@ -64,7 +65,8 @@ public Result apply(TableFinishNode node, Captures captures, Context context) | |||
.map(newHandle -> new TableDeleteNode( | |||
context.getIdAllocator().getNextId(), | |||
newHandle, | |||
getOnlyElement(node.getOutputSymbols()))) | |||
getOnlyElement(node.getOutputSymbols()), | |||
ImmutableList.of(tableScan.getTable()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the `tableScan.getTable() target table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scanned table is what is being read.
See also a few lines above which indicates as well that this is the input table.
metadata.applyDelete(context.getSession(), tableScan.getTable())
@@ -59,15 +61,16 @@ else if (transactionLogEntry.getAdd() != null) { | |||
addedFilesCanonicalPartitionValuesBuilder.add(transactionLogEntry.getAdd().getCanonicalPartitionValues()); | |||
} | |||
else if (transactionLogEntry.getRemove() != null) { | |||
removedFilesFound = true; | |||
Map<String, String> partitionValues = transactionLogEntry.getRemove().partitionValues(); | |||
removedFilesCanonicalPartitionValuesBuilder.add(partitionValues == null ? ImmutableMap.of() : canonicalizePartitionValues(partitionValues)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we add empty map to set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
In the unlikely case that the winning commit does not contain information about the partition values of the remove
file, we should fail the operation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also fix addFiles in the same way?
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
...lta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
...lta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java
Outdated
Show resolved
Hide resolved
ead4b33
to
e4de37b
Compare
@ebyhr pls run the PR with secrets |
/test-with-secrets sha=e4de37b413e10dd0a9d255c95a868ccb1b4e4f01 |
The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/8637812363 |
e29ae61
to
cda867b
Compare
/test-with-secrets sha=cda867bfcfe862af396c91026220e33692f5e4ce |
The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/8656675392 |
core/trino-main/src/main/java/io/trino/sql/planner/plan/TableDeleteNode.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Show resolved
Hide resolved
cda867b
to
1979c9c
Compare
1979c9c
to
bb6c2e3
Compare
…e tables" This reverts commit 9d0b9f4.
In case of performing commit retries, start seeking the latest version of the table from the version of the table read during the latest attempt to commit the insert operation. In the context of concurrent operations, this strategy can spare some unnecessary HEAD operations to iterate incrementally starting from the version read when the table handle was instantiated.
bb6c2e3
to
10b2d55
Compare
/test-with-secrets sha=10b2d559dfce2926e352613677558fec0bc68e45 |
The CI workflow run with tests that require additional secrets has been started: https://github.com/trinodb/trino/actions/runs/8826682787 |
10b2d55
to
31db21d
Compare
UPDATE
/MERGE
/DELETE
in Delta LakeDELETE
pushdown/TRUNCATE
in Delta Lake
...lta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java
Outdated
Show resolved
Hide resolved
Allow committing pushdown DELETE operations in a concurrent context by placing these operations right after any other previously concurrently completed write operations. Disallow committing the operation in any of the following cases: - table schema change has been committed in the meantime - table protocol change has been committed in the meantime - add files committed in the meantime should be read by the current operation - remove files committed in the meantime conflict with the add files read by the current operation The current changes also take into consideration the `delta.isolationLevel` table property of the Delta Lake table for DELETE operations. Relevant example taken from Databricks documentation in regards to the distinction between `WriteSerializable` and `Serializable` isolation levels: > For example, consider `txn1`, a long running delete and `txn2`, > which inserts blindly data into the table. > `txn2` and `txn1` complete and they are recorded in the order > `txn2, txn1` > into the history of the table. > According to the history, the data inserted in `txn2` should not exist > in the table. For `Serializable` level, a reader would never see data > inserted by `txn2`. However, for the `WriteSerializable` level, a reader > could at some point see the data inserted by `txn2`. A few words about WriteSerializable isolation level taken from delta.io javadocs: > This isolation level will ensure snapshot isolation consistency guarantee > between write operations only. > In other words, if only the write operations are considered, then > there exists a serializable sequence between them that would produce the same > result as seen in the table.
31db21d
to
f7829ae
Compare
Description
Allow committing operations based on delete pushdown / truncate
in a concurrent context by placing these operations right after
any other previously concurrently completed write operations.
Disallow committing the operation in any of the following cases:
the current operation
add files read by the current operation
The current changes also take into consideration the
delta.isolationLevel
table property of the Delta Lake table for DELETE/TRUNCATE operations.
Relevant example taken from Databricks documentation in regards to the
distinction between
WriteSerializable
andSerializable
isolation levels:A few words about WriteSerializable isolation level taken from delta.io javadocs:
Additional context and related issues
INSERT scaffolding PRs:
INSERT
in Delta Lake #18506Depends on the fix from #21330
Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(x) Release notes are required, with the following suggested text: