-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support sorted writes in the Iceberg connector #14891
Conversation
Note to self: rework the "Allow updating the sorted_by Iceberg table property" commit to give @osscm author credit. |
Add compatibility tests with spark. |
I'd appreciate having a demo test that emphasises on the fact that the number of files being read is smaller when working with I don't see |
assertThat(catalog.listTables(SESSION, Optional.empty())).contains(schemaTableName); | ||
|
||
Table icebergTable = catalog.loadTable(SESSION, schemaTableName); | ||
assertEquals(icebergTable.name(), quotedTableName(schemaTableName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertions related to the columns were already made in io.trino.plugin.iceberg.catalog.BaseTrinoCatalogTest#testCreateTable
. Are they relevant here as well?
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SortFields.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
return writerSortBufferSize; | ||
} | ||
|
||
@Config("hive.writer-sort-buffer-size") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pre-existing: It would be beneficial to have the purpose of this property documented (in the code).
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java
Outdated
Show resolved
Hide resolved
|
||
dropTable("test_sorted_with_partition_table"); | ||
assertUpdate("INSERT INTO " + tableName + " VALUES (true, 1, 5), (false, 2, 4), (true, 3, 3), (false, 4, 2), (true, 5, 1)", 5); | ||
assertQuery("SELECT * FROM " + tableName, "VALUES (true, 1, 5), (false, 2, 4), (true, 3, 3), (false, 4, 2), (true, 5, 1)"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we showcase in this test?
I think the test would pass also without sorted_by
property specified on the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gentle reminder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new reminder :)
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
Show resolved
Hide resolved
ca6d0fb
to
105b42b
Compare
Still working on test cases and Marius' comments, but added support for sorting during updates and during |
void verifyFileIsSorted(String path, String sortColumnName) | ||
{ | ||
Comparable previousMax = null; | ||
try (ParquetFileReader parquetReader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path), ConfigurationInstantiator.newEmptyConfiguration()))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naive question: Why not read all the rows from the given parquet file and actually verify that any given row follows the sort order contract in respect to the previous row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, we could it just sounds more complex to set up the test.
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
Outdated
Show resolved
Hide resolved
37d76ab
to
e1191f4
Compare
2362877
to
b0d4cf0
Compare
@findinpath @findepi @ebyhr I think this is ready for review. Please take a look when you get a chance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % comments
plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
Show resolved
Hide resolved
|
||
private static void parseSortField(SortOrderBuilder<?> builder, String field) | ||
{ | ||
boolean matched = tryMatch(field, IDENTITY_ASC_NULLS_FIRST_PATTERN, match -> builder.asc(fromIdentifierToColumn(match.group(1).trim()), NullOrder.NULLS_FIRST)) || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we extract fromIdentifierToColumn
to a shared utility class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably, but I'd rather leave it until we add transform support and clean it up then. I think there are a few other things that should be central between both partition and sort transform parsing.
|
||
dropTable("test_sorted_with_partition_table"); | ||
assertUpdate("INSERT INTO " + tableName + " VALUES (true, 1, 5), (false, 2, 4), (true, 3, 3), (false, 4, 2), (true, 5, 1)", 5); | ||
assertQuery("SELECT * FROM " + tableName, "VALUES (true, 1, 5), (false, 2, 4), (true, 3, 3), (false, 4, 2), (true, 5, 1)"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gentle reminder
return sortedWritingEnabled; | ||
} | ||
|
||
@Config("iceberg.sorted-writing-enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a (temporary) fallback in case that the sorted writing does not work as expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can also be useful if your writes are very small (streaming ingest, for example) such that sorting them would be a waste of time until they are compacted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writes being small or not sounds like a query-dependent, so it warrants session toggle more than a catalog config.
Also, can a writer detect that written data is small and not worth sorting?
OTOH, sorting small amount of data sounds like not a big deal (as long as it happens fully in memory and doesn't add latency), so why would we care?
The config should still remain as a kill switch.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSortingFileWriter.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
Outdated
Show resolved
Hide resolved
b0d4cf0
to
4cdd588
Compare
e5bbfdc
to
762cfee
Compare
Addressed doc comments. I do need to fix the tests though. I moved some to the smoke tests so that they run against all supported file systems but they're not working yet. |
c39baaa
to
c3f4fc1
Compare
33590e7
to
0f09920
Compare
/test-with-secrets sha=0f09920b81b690612b026fcfd6e7c4cb252951ee |
The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/4289597275 |
Run with secrets failed: #13199 (reopened) |
Co-authored-by: Alex Jo <jo.alex2144@gmail.com>
0f09920
to
f9d5336
Compare
rebased to resolve a conflict |
f9d5336
to
b7adc4c
Compare
thanks! |
Description
Support sorting files during inserts to the Iceberg connector. This reuses the SortingFileWriter from the Hive connector.
Non-technical explanation
Sorting enables better performance during selective read queries, where a small range of values is needed from a high carnality column.
Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(x) Release notes are required, with the following suggested text: