-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Respect target-max-file-size in Iceberg #10957
Respect target-max-file-size in Iceberg #10957
Conversation
9adc6b0
to
277818f
Compare
5f601a1
to
32244aa
Compare
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
Outdated
Show resolved
Hide resolved
3ada9c2
to
a47b881
Compare
@@ -89,6 +89,11 @@ public long getWrittenBytes() | |||
return parquetWriter.getWrittenBytes(); | |||
} | |||
|
|||
public long getBufferedBytes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we want to use it for Iceberg, but not for Hive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I saw hive does it little differently but it also takes bufferedbytes into account, just not directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am looking at
if (bucketFunction != null || isTransactional || writer.getWrittenBytes() <= targetMaxFileSize.orElse(Long.MAX_VALUE)) { |
and indeed ORC includes buffered bytes, and Parquet does not (#10957 (comment))
Still, we don't need this to be different for Hive vs Iceberg, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok sorry for a wrong explanation, so Hive uses implementations of FileWriter
. There are many of them and as far as I understand it is hard for some of them to measure even already written bytes. For example here
trino/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RecordFileWriter.java
Lines 135 to 157 in 6588a9e
public long getWrittenBytes() | |
{ | |
if (recordWriter instanceof ExtendedRecordWriter) { | |
return ((ExtendedRecordWriter) recordWriter).getWrittenBytes(); | |
} | |
if (committed) { | |
if (finalWrittenBytes != -1) { | |
return finalWrittenBytes; | |
} | |
try { | |
finalWrittenBytes = path.getFileSystem(conf).getFileStatus(path).getLen(); | |
return finalWrittenBytes; | |
} | |
catch (IOException e) { | |
throw new UncheckedIOException(e); | |
} | |
} | |
// there is no good way to get this when RecordWriter is not yet committed | |
return 0; | |
} |
I had no idea how to implement getWrittenBytes
and getBufferedByte
for some FileWriter
implementations so I only focused on those used by Iceberg.
@Override | ||
public long getBufferedBytes() | ||
{ | ||
return 0; //buffered bytes are already counted in written bytes by orcFileWriter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like undesired inconsistency. Would it be possible to fix it instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try to that in a separate PR after this one, ok ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be as simple as adding parquetWriter.getBufferedBytes()
at
trino/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java
Line 89 in 9ab09e9
return parquetWriter.getWrittenBytes(); |
as a followup we may want to document or rename the method.
i would prefer to fix the inconsistency rather than introduce a method, if the end result is that the method is going to be removed soonish
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it may be that simple but it will change behaviour of getWrittenBytes
and I am not sure what depends on that. For example SortingFileWritter
depends on that.. I'd prefer to do it as a separate PR to be easily able to see what is broken(if anything) by this change.
@findepi Do you agree?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it may be that simple but it will change behaviour of
getWrittenBytes
and I am not sure what depends on that.
For Parquet -- yes
for ORC -- no
SortingFileWritter depends on that..
if this is a problem, we already have a problem (for ORC), right?
@@ -345,4 +354,9 @@ public static boolean isProjectionPushdownEnabled(ConnectorSession session) | |||
{ | |||
return session.getProperty(PROJECTION_PUSHDOWN_ENABLED, Boolean.class); | |||
} | |||
|
|||
public static Optional<Long> getTargetMaxFileSize(ConnectorSession session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i asked you to have Optional
here, but actually ... -- #10978
.getMaterializedRows() | ||
// as target_max_file_size is set to quite low value it can happen that created files are bigger, | ||
// so just to be safe we check if it is not much bigger | ||
.forEach(row -> assertThat((Long) row.getField(0)).isLessThan(maxSize.toBytes() * 3)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use "is between". Let's verify no empty files got created.
a47b881
to
f663c7a
Compare
Can you please change the description of the PR to the new Trino PR template? |
and where is that ? |
f663c7a
to
5063c0e
Compare
@findepi I pushed new version without |
5063c0e
to
5b100ef
Compare
|
@@ -161,7 +161,7 @@ private void writeChunk(Page page) | |||
flush(); | |||
initColumnWriters(); | |||
rows = 0; | |||
bufferedBytes = columnWriters.stream().mapToLong(ColumnWriter::getBufferedBytes).sum(); | |||
bufferedBytes = columnWriters.stream().mapToLong(ColumnWriter::getRetainedBytes).sum(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the change here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make it behave same was as OrcWriter, and it actually makes sense to use retainedBytes here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getBufferedBytes seems to be "how much data i have"
getRetainedBytes seems to be "how much memory i have allocated"
at least this is what i see in FixedLenByteArrayPlainValuesWriter
that i picked as example impl
i think we should use getBufferedBytes
(does OrcWriter need a change?)
@skrzypo987 do you know better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I can change it to getBufferedBytes, should I also change it for OrcWriter in this PR ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getBufferedBytes
is the size that will actually be wirtten.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I can change it to getBufferedBytes, should I also change it for OrcWriter in this PR ?
can be separate.
@@ -69,6 +70,7 @@ | |||
private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout"; | |||
private static final String STATISTICS_ENABLED = "statistics_enabled"; | |||
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled"; | |||
private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add an empty line before the list
assertUpdate(session, createTableSql, 100000); | ||
assertThat(query(format("SELECT count(*) FROM %s", tableName))).matches("VALUES (BIGINT '100000')"); | ||
List<String> updatedFiles = getActiveFiles(tableName); | ||
assertThat(updatedFiles.size()).isGreaterThan(3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we expect even more? like 10?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using default settings we should not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not under default setting. you force small file size, so how many files are we getting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently there are around 30, I can change this value to 10
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, let's have 10
.build(); | ||
|
||
assertUpdate(session, createTableSql, 100000); | ||
assertThat(query(format("SELECT count(*) FROM %s", tableName))).matches("VALUES (BIGINT '100000')"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(
in VALUES are redundant
5b100ef
to
7696587
Compare
failing suite is not related, looks like there was an issue with container |
Description
General information
Related issues, pull requests, and links
Documentation
(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
(x) No release notes entries required.
( ) Release notes entries required with the following suggested text: