-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fail Iceberg queries against v2 tables with row level deletes #8450
Conversation
1db410a
to
125bcd6
Compare
125bcd6
to
73f19ae
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -247,7 +247,14 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa | |||
throw new UnknownTableTypeException(tableName); | |||
} | |||
|
|||
org.apache.iceberg.Table table = getIcebergTable(session, hiveTable.get().getSchemaTableName()); | |||
TableMetadata metadata = tableMetadataCache.computeIfAbsent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not change getIcerbergTable
itself if the goal is to prevent working with v2 tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I'm not sure why I had it set in my head that this needed to go here. Thanks
@@ -87,6 +89,11 @@ private ConnectorSplit toIcebergSplit(FileScanTask task) | |||
// The predicate here is used by readers for predicate push down at reader level, | |||
// so when we do not use residual expression, we are just wasting CPU cycles | |||
// on reader side evaluating a condition that we know will always be true. | |||
if (!task.deletes().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this should be the only change in this PR.
Disallowing v2 tables even if there are no row-level deletes applied sounds a bit extreme.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - I agree. I would very much prefer to only fail tables with delete markers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is more tricky to write test though. Does spark already support row-level deletes? If so we can use PT env we already have I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess my thought is that with the spec not being final there's a chance other breaking changes will get added that this change won't catch. I don't know if there's still active development going on with the spec though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense to let other v2 tables go through. We can probably add spark compatibility tests against two spec versions to discover breaking issues if any, once v2 is final.
On a quick look at Iceberg code, I think we should be able to leverage table.newRowDelta
API for testing. this indicates that adding extensions should allow performing Row level deletes with spark 3, but haven't verified it myself. cc @electrum if you've more context around the status of row-delete writes through other engines.
I recommend failing at planning time if any
Right now, the library requires you to manually update a table to v2, but 0.11.0 will successfully write deltas to such a table. The only engine that has support is Flink, though. So to find v2 tables right now, someone would have to manually update a table to accept deltas and then use either Flink or encode deltas programmatically and commit them to the table. |
87982c3
to
a8c9c57
Compare
a8c9c57
to
914bd9a
Compare
Thanks for the input, updated the changes to just fail for table scans that include deletes. I did have to change the |
@@ -71,8 +80,21 @@ public ConnectorSplitSource getSplits( | |||
|
|||
// TODO Use residual. Right now there is no way to propagate residual to Trino but at least we can | |||
// propagate it at split level so the parquet pushdown can leverage it. | |||
IcebergSplitSource splitSource = new IcebergSplitSource(tableScan.planTasks()); | |||
ImmutableList.Builder<FileScanTask> fileScanTasks = ImmutableList.builder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change makes splits materialized eagerly during planning. While previously (IIUC) they were listed lazilly as the execution went.
I would prefer to not change that and fail only if we encounter the file which contains deletes as we go.
It may be later during the query, which is a somewhat worse experience for the user when they try to read V2 table with deletes. But I would argue that we should not optimize for nicer handling of unsupported tables if that hinders the typical case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with this. Iterating through the result of planTasks
will perform planning separately just to validate there are no delete files, which is almost certainly something you don't want to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, thanks. I didn't have a good sense of how much work was being done lazily in the CloseableIterable<CombinedScanTask>
. I'll switch it back.
.map(CombinedScanTask::files) | ||
.flatMap(Collection::stream) | ||
.iterator(); | ||
this.fileScanIterator = requireNonNull(fileScanTasks, "fileScanTasks is null").iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@phd3, it looks odd to me that the code here was previously getting all of the file scan tasks, which basically defeats the purpose of using the combined task iterable. Can you give me some background on what's going on here?
That said, I think that this is where the check would go in the older code:
this.fileScanIterator = Streams.stream(combinedScanIterable)
.map(CombinedScanTask::files)
.flatMap(Collection::stream)
.map(file -> {
if (!file.deletes().isEmpty()) {
throw new TrinoException(...);
}
return file;
})
.iterator();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean we should be assigning the whole CombinedScanTask
as one split instead of undoing the balancing work by getting individual files from it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current implementation, while some FileScanTasks
can be much smaller, I think the usage of CombinedScanIterable
helps provide an upper bound on split size, as opposed to the alternative planFiles
. @electrum may know more if there was a previous discussion around this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(if this is indeed what you're referring to) the # of pending splits are also considered while assigning new splits to Trino's tasks. So I think the imbalance caused by ignoring the CombinedScanTask grouping would be diluted, since the tasks finishing small splits would become eligible for getting assigned some new ones faster.
Initially Iceberg connector's data reader heavily reused hive connector code, so it's likely another reason that we kept using the same model of scanning one part of one file for one Trino split. ( Please ignore this if I'm way off base here from your point : ) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think you understood what I was asking. Using the combined tasks means that files will be both split and combined. It seems a little weird to undo combining small tasks into larger ones. You're right that this does provide an upper bound on the split sizes, but I don't know why you would want to remove combining splits across files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can move this over to an Issue? It would require changing the Split format I'd rather do it in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently, one FileScanTask
corresponds to one Trino split, and every Trino task keeps getting assigned more splits based on available capacity. IIUC, assigning a CombinedScanTask
to a split means that Trino's iceberg reader will need to scan multiple files for every split, which is a bit different from the way things are modeled currently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, so it sounds like Trino handles split combining on its own dynamically. Is that right? If so, then this makes sense. You could also avoid some work by not combining, but it probably doesn't matter much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. I'd guess that using combined tasks wouldn't provide a lot of improvement over the current approach.
You could also avoid some work by not combining
I didn't see any TableScan
method though that avoids combining, but still divides huge files. I guess we'd need to do that in Trino after using planFiles
.
914bd9a
to
10f8a31
Compare
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
Outdated
Show resolved
Hide resolved
HdfsConfiguration configuration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(config), ImmutableSet.of()); | ||
hdfsEnvironment = new HdfsEnvironment(configuration, config, new NoHdfsAuthentication()); | ||
|
||
metastore = new FileHiveMetastore( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd better to create the directory path here and use it to initialize both the queryRunner and everywhere else in tests. Also, there's a createTestingFileHiveMetastore
to help with the boilerplate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran into one problem with this, that the NodeVersion used in createTestingFileHiveMetastore
doesn't match the one used in TestingTrinoServer
so queries would fail with an incompatible version error in FileHiveMetastore. I changed the versions to match in the first commit as a fix.
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
Outdated
Show resolved
Hide resolved
thanks @alexjo2144, just added some comments w.r.t. the test. |
If FileHiveMetastore#createTestingFileHiveMetastore is used along with TestingTrinoServer the version check would fail.
10f8a31
to
d3566c8
Compare
Comments addressed |
d3566c8
to
ef2504f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % some comments.
This will cause non-deterministic success/failure for LIMIT queries, but I think that's still better than the alternative of eager task loading.
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
Outdated
Show resolved
Hide resolved
@@ -59,6 +61,9 @@ public IcebergSplitSource(CloseableIterable<CombinedScanTask> combinedScanIterab | |||
Iterator<FileScanTask> iterator = limit(fileScanIterator, maxSize); | |||
while (iterator.hasNext()) { | |||
FileScanTask task = iterator.next(); | |||
if (!task.deletes().isEmpty()) { | |||
throw new TrinoException(NOT_SUPPORTED, "Iceberg tables with delete files are not supported"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missed this earlier, a SchemaTableName would be good to add here in the error message.
The v2 specification is not final but some writers are already adding support for it. For now, ensure that any tables with the new row level delete format cannot be queried.
ef2504f
to
2295b40
Compare
Updated, thanks for the review |
{ | ||
String tableName = "test_v2_table_read" + randomTableSuffix(); | ||
assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25); | ||
updateTableToV2(tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a follow up, i would love to see a product test with Spark Iceberg for this too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also add the version info in one of our system tables to make it easier to verify that Spark indeed writes v2.
Merged, thanks @alexjo2144! |
The v2 specification is not final but some writers are already adding support for it. For now, ensure that any tables with the new format cannot be queried.
#7226