Skip to content

Commit

Permalink
Fix iceberg optimize tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gaurav8297 authored and raunaqmorarka committed Jul 26, 2023
1 parent b9256c3 commit 5240056
Showing 1 changed file with 20 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4949,7 +4949,7 @@ public void testOptimize(int formatVersion)
int workerCount = getQueryRunner().getNodeCount();

// optimize an empty table
assertQuerySucceeds("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertThat(getActiveFiles(tableName)).isEmpty();

assertUpdate("INSERT INTO " + tableName + " VALUES (11, 'eleven')", 1);
Expand All @@ -4964,7 +4964,8 @@ public void testOptimize(int formatVersion)
// Verify we have sufficiently many test rows with respect to worker count.
.hasSizeGreaterThan(workerCount);

computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertThat(query("SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName))
.matches("VALUES (BIGINT '65', VARCHAR 'eleven zwölf trzynaście quatorze пʼятнадцять')");
List<String> updatedFiles = getActiveFiles(tableName);
Expand All @@ -4976,7 +4977,8 @@ public void testOptimize(int formatVersion)
.containsExactlyInAnyOrderElementsOf(concat(initialFiles, updatedFiles));

// optimize with low retention threshold, nothing should change
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE (file_size_threshold => '33B')");
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE (file_size_threshold => '33B')");
assertThat(query("SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName))
.matches("VALUES (BIGINT '65', VARCHAR 'eleven zwölf trzynaście quatorze пʼятнадцять')");
assertThat(getActiveFiles(tableName)).isEqualTo(updatedFiles);
Expand Down Expand Up @@ -5006,7 +5008,7 @@ public void testOptimizeForPartitionedTable(int formatVersion)
String tableName = "test_repartitiong_during_optimize_" + randomNameSuffix();
assertUpdate(session, "CREATE TABLE " + tableName + " (key varchar, value integer) WITH (format_version = " + formatVersion + ", partitioning = ARRAY['key'])");
// optimize an empty table
assertQuerySucceeds(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertQuerySucceeds(withSingleWriterPerTask(session), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");

assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 1)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 2)", 1);
Expand All @@ -5022,7 +5024,8 @@ public void testOptimizeForPartitionedTable(int formatVersion)
List<String> initialFiles = getActiveFiles(tableName);
assertThat(initialFiles).hasSize(10);

computeActual(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
computeActual(withSingleWriterPerTask(session), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");

assertThat(query(session, "SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName))
.matches("VALUES (BIGINT '55', VARCHAR 'one one one one one one one three two two')");
Expand Down Expand Up @@ -5081,10 +5084,11 @@ public void testOptimizeTimePartitionedTable(String dataType, String partitionin
.isGreaterThanOrEqualTo(5);

assertUpdate(
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
// Use UTC zone so that DATE and TIMESTAMP WITH TIME ZONE comparisons align with partition boundaries.
Session.builder(getSession())
withSingleWriterPerTask(Session.builder(getSession())
.setTimeZoneKey(UTC_KEY)
.build(),
.build()),
"ALTER TABLE " + tableName + " EXECUTE optimize WHERE p >= " + optimizeDate);

assertThat((long) computeScalar("SELECT count(DISTINCT \"$path\") FROM " + tableName + " WHERE p < " + optimizeDate))
Expand All @@ -5096,9 +5100,10 @@ public void testOptimizeTimePartitionedTable(String dataType, String partitionin

// Verify that WHERE CAST(p AS date) ... form works in non-UTC zone
assertUpdate(
Session.builder(getSession())
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
withSingleWriterPerTask(Session.builder(getSession())
.setTimeZoneKey(getTimeZoneKey("Asia/Kathmandu"))
.build(),
.build()),
"ALTER TABLE " + tableName + " EXECUTE optimize WHERE CAST(p AS date) >= " + optimizeDate);

// Table state shouldn't change substantially (but files may be rewritten)
Expand Down Expand Up @@ -5141,7 +5146,8 @@ public void testOptimizeTableAfterDeleteWithFormatVersion2()
"SELECT summary['total-delete-files'] FROM \"" + tableName + "$snapshots\" WHERE snapshot_id = " + getCurrentSnapshotId(tableName),
"VALUES '1'");

computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");

List<String> updatedFiles = getActiveFiles(tableName);
assertThat(updatedFiles)
Expand Down Expand Up @@ -5174,7 +5180,8 @@ public void testOptimizeCleansUpDeleteFiles()
List<String> allDataFilesAfterDelete = getAllDataFilesFromTableDirectory(tableName);
assertThat(allDataFilesAfterDelete).hasSize(6);

computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE regionkey = 4");
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE regionkey = 4");
computeActual(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')");
computeActual(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')");

Expand All @@ -5190,7 +5197,8 @@ public void testOptimizeCleansUpDeleteFiles()
assertThat(query("SELECT * FROM " + tableName))
.matches("SELECT * FROM nation WHERE nationkey != 7");

computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
// For optimize we need to set task_writer_count to 1, otherwise it will create more than one file.
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
computeActual(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')");
computeActual(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')");

Expand Down

0 comments on commit 5240056

Please sign in to comment.