Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](iceberg)Fix count(*) error with dangling delete problem #44039

Merged
merged 3 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,17 @@ start-thriftserver.sh --driver-java-options "-Dderby.system.home=/tmp/derby"



ls /mnt/scripts/create_preinstalled_scripts/*.sql | xargs -n 1 -I {} bash -c '
ls /mnt/scripts/create_preinstalled_scripts/iceberg/*.sql | xargs -n 1 -I {} bash -c '
START_TIME=$(date +%s)
spark-sql --master spark://doris--spark-iceberg:7077 -f {}
spark-sql --master spark://doris--spark-iceberg:7077 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions -f {}
END_TIME=$(date +%s)
EXECUTION_TIME=$((END_TIME - START_TIME))
echo "Script: {} executed in $EXECUTION_TIME seconds"
'

ls /mnt/scripts/create_preinstalled_scripts/paimon/*.sql | xargs -n 1 -I {} bash -c '
START_TIME=$(date +%s)
spark-sql --master spark://doris--spark-iceberg:7077 --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions -f {}
END_TIME=$(date +%s)
EXECUTION_TIME=$((END_TIME - START_TIME))
echo "Script: {} executed in $EXECUTION_TIME seconds"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use demo.test_db;

drop table if exists dangling_delete_after_write;
create table dangling_delete_after_write (
id BIGINT NOT NULL,
val STRING)
USING iceberg
TBLPROPERTIES (
'format' = 'iceberg/parquet',
'format-version' = '2',
'identifier-fields' = '[id]',
'upsert-enabled' = 'true',
'write.delete.mode' = 'merge-on-read',
'write.parquet.compression-codec' = 'zstd',
'write.update.mode' = 'merge-on-read',
'write.upsert.enabled' = 'true');

insert into dangling_delete_after_write values(1, 'abd');
update dangling_delete_after_write set val = 'def' where id = 1;
call demo.system.rewrite_data_files(table => 'demo.test_db.dangling_delete_after_write', options => map('min-input-files', '1'));
insert into dangling_delete_after_write values(2, 'xyz');
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,15 @@ private long getCountFromSnapshot() {
return 0;
}

// `TOTAL_POSITION_DELETES` is need to 0,
// because prevent 'dangling delete' problem after `rewrite_data_files`
// ref: https://iceberg.apache.org/docs/nightly/spark-procedures/#rewrite_position_delete_files
Map<String, String> summary = snapshot.summary();
if (summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")) {
return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS))
- Long.parseLong(summary.get(IcebergUtils.TOTAL_POSITION_DELETES));
} else {
if (!summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")
|| !summary.get(IcebergUtils.TOTAL_POSITION_DELETES).equals("0")) {
return -1;
}
return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@
-- !q08 --
1000

-- !q09 --
2

Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ suite("test_iceberg_optimize_count", "p0,external,doris,external_docker,external
}
explain {
sql("""${sqlstr4}""")
contains """pushdown agg=COUNT (1000)"""
contains """pushdown agg=COUNT (-1)"""
}

// don't use push down count
Expand Down Expand Up @@ -97,6 +97,17 @@ suite("test_iceberg_optimize_count", "p0,external,doris,external_docker,external
contains """pushdown agg=NONE"""
}

// There has `dangling delete` after rewrite
sql """ set enable_count_push_down_for_external_table=true; """
sqlstr5 = """ select count(*) from ${catalog_name}.test_db.dangling_delete_after_write; """

qt_q09 """${sqlstr5}"""

explain {
sql("""${sqlstr5}""")
contains """pushdown agg=COUNT (-1)"""
}

} finally {
sql """ set enable_count_push_down_for_external_table=true; """
sql """drop catalog if exists ${catalog_name}"""
Expand Down
Loading