Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix iceberg optimize tests #18415

Merged
merged 1 commit into from
Jul 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4949,7 +4949,7 @@ public void testOptimize(int formatVersion)
int workerCount = getQueryRunner().getNodeCount();

// optimize an empty table
assertQuerySucceeds("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertThat(getActiveFiles(tableName)).isEmpty();

assertUpdate("INSERT INTO " + tableName + " VALUES (11, 'eleven')", 1);
Expand All @@ -4964,7 +4964,8 @@ public void testOptimize(int formatVersion)
// Verify we have sufficiently many test rows with respect to worker count.
.hasSizeGreaterThan(workerCount);

computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertThat(query("SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName))
.matches("VALUES (BIGINT '65', VARCHAR 'eleven zwölf trzynaście quatorze пʼятнадцять')");
List<String> updatedFiles = getActiveFiles(tableName);
Expand All @@ -4976,7 +4977,8 @@ public void testOptimize(int formatVersion)
.containsExactlyInAnyOrderElementsOf(concat(initialFiles, updatedFiles));

// optimize with low retention threshold, nothing should change
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE (file_size_threshold => '33B')");
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE (file_size_threshold => '33B')");
assertThat(query("SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName))
.matches("VALUES (BIGINT '65', VARCHAR 'eleven zwölf trzynaście quatorze пʼятнадцять')");
assertThat(getActiveFiles(tableName)).isEqualTo(updatedFiles);
Expand Down Expand Up @@ -5006,7 +5008,7 @@ public void testOptimizeForPartitionedTable(int formatVersion)
String tableName = "test_repartitiong_during_optimize_" + randomNameSuffix();
assertUpdate(session, "CREATE TABLE " + tableName + " (key varchar, value integer) WITH (format_version = " + formatVersion + ", partitioning = ARRAY['key'])");
// optimize an empty table
assertQuerySucceeds(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertQuerySucceeds(withSingleWriterPerTask(session), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");

assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 1)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 2)", 1);
Expand All @@ -5022,7 +5024,8 @@ public void testOptimizeForPartitionedTable(int formatVersion)
List<String> initialFiles = getActiveFiles(tableName);
assertThat(initialFiles).hasSize(10);

computeActual(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
computeActual(withSingleWriterPerTask(session), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");

assertThat(query(session, "SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName))
.matches("VALUES (BIGINT '55', VARCHAR 'one one one one one one one three two two')");
Expand Down Expand Up @@ -5081,10 +5084,11 @@ public void testOptimizeTimePartitionedTable(String dataType, String partitionin
.isGreaterThanOrEqualTo(5);

assertUpdate(
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
// Use UTC zone so that DATE and TIMESTAMP WITH TIME ZONE comparisons align with partition boundaries.
Session.builder(getSession())
withSingleWriterPerTask(Session.builder(getSession())
.setTimeZoneKey(UTC_KEY)
.build(),
.build()),
"ALTER TABLE " + tableName + " EXECUTE optimize WHERE p >= " + optimizeDate);

assertThat((long) computeScalar("SELECT count(DISTINCT \"$path\") FROM " + tableName + " WHERE p < " + optimizeDate))
Expand All @@ -5096,9 +5100,10 @@ public void testOptimizeTimePartitionedTable(String dataType, String partitionin

// Verify that WHERE CAST(p AS date) ... form works in non-UTC zone
assertUpdate(
Session.builder(getSession())
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
withSingleWriterPerTask(Session.builder(getSession())
.setTimeZoneKey(getTimeZoneKey("Asia/Kathmandu"))
.build(),
.build()),
"ALTER TABLE " + tableName + " EXECUTE optimize WHERE CAST(p AS date) >= " + optimizeDate);

// Table state shouldn't change substantially (but files may be rewritten)
Expand Down Expand Up @@ -5141,7 +5146,8 @@ public void testOptimizeTableAfterDeleteWithFormatVersion2()
"SELECT summary['total-delete-files'] FROM \"" + tableName + "$snapshots\" WHERE snapshot_id = " + getCurrentSnapshotId(tableName),
"VALUES '1'");

computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");

List<String> updatedFiles = getActiveFiles(tableName);
assertThat(updatedFiles)
Expand Down Expand Up @@ -5174,7 +5180,8 @@ public void testOptimizeCleansUpDeleteFiles()
List<String> allDataFilesAfterDelete = getAllDataFilesFromTableDirectory(tableName);
assertThat(allDataFilesAfterDelete).hasSize(6);

computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE regionkey = 4");
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE regionkey = 4");
computeActual(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')");
computeActual(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')");

Expand All @@ -5190,7 +5197,8 @@ public void testOptimizeCleansUpDeleteFiles()
assertThat(query("SELECT * FROM " + tableName))
.matches("SELECT * FROM nation WHERE nationkey != 7");

computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
computeActual(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')");
computeActual(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')");

Expand Down