diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 3e3a00645da9..171cb711510e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -4949,7 +4949,7 @@ public void testOptimize(int formatVersion) int workerCount = getQueryRunner().getNodeCount(); // optimize an empty table - assertQuerySucceeds("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertQuerySucceeds(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); assertThat(getActiveFiles(tableName)).isEmpty(); assertUpdate("INSERT INTO " + tableName + " VALUES (11, 'eleven')", 1); @@ -4964,7 +4964,8 @@ public void testOptimize(int formatVersion) // Verify we have sufficiently many test rows with respect to worker count. .hasSizeGreaterThan(workerCount); - computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + // For optimize we need to set task_writer_count to 1, otherwise it will create more than one file. + computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); assertThat(query("SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName)) .matches("VALUES (BIGINT '65', VARCHAR 'eleven zwölf trzynaście quatorze пʼятнадцять')"); List updatedFiles = getActiveFiles(tableName); @@ -4976,7 +4977,8 @@ public void testOptimize(int formatVersion) .containsExactlyInAnyOrderElementsOf(concat(initialFiles, updatedFiles)); // optimize with low retention threshold, nothing should change - computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE (file_size_threshold => '33B')"); + // For optimize we need to set task_writer_count to 1, otherwise it will create more than one file. + computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE (file_size_threshold => '33B')"); assertThat(query("SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName)) .matches("VALUES (BIGINT '65', VARCHAR 'eleven zwölf trzynaście quatorze пʼятнадцять')"); assertThat(getActiveFiles(tableName)).isEqualTo(updatedFiles); @@ -5006,7 +5008,7 @@ public void testOptimizeForPartitionedTable(int formatVersion) String tableName = "test_repartitiong_during_optimize_" + randomNameSuffix(); assertUpdate(session, "CREATE TABLE " + tableName + " (key varchar, value integer) WITH (format_version = " + formatVersion + ", partitioning = ARRAY['key'])"); // optimize an empty table - assertQuerySucceeds(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertQuerySucceeds(withSingleWriterPerTask(session), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 1)", 1); assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 2)", 1); @@ -5022,7 +5024,8 @@ public void testOptimizeForPartitionedTable(int formatVersion) List initialFiles = getActiveFiles(tableName); assertThat(initialFiles).hasSize(10); - computeActual(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + // For optimize we need to set task_writer_count to 1, otherwise it will create more than one file. + computeActual(withSingleWriterPerTask(session), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); assertThat(query(session, "SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName)) .matches("VALUES (BIGINT '55', VARCHAR 'one one one one one one one three two two')"); @@ -5081,10 +5084,11 @@ public void testOptimizeTimePartitionedTable(String dataType, String partitionin .isGreaterThanOrEqualTo(5); assertUpdate( + // For optimize we need to set task_writer_count to 1, otherwise it will create more than one file. // Use UTC zone so that DATE and TIMESTAMP WITH TIME ZONE comparisons align with partition boundaries. - Session.builder(getSession()) + withSingleWriterPerTask(Session.builder(getSession()) .setTimeZoneKey(UTC_KEY) - .build(), + .build()), "ALTER TABLE " + tableName + " EXECUTE optimize WHERE p >= " + optimizeDate); assertThat((long) computeScalar("SELECT count(DISTINCT \"$path\") FROM " + tableName + " WHERE p < " + optimizeDate)) @@ -5096,9 +5100,10 @@ public void testOptimizeTimePartitionedTable(String dataType, String partitionin // Verify that WHERE CAST(p AS date) ... form works in non-UTC zone assertUpdate( - Session.builder(getSession()) + // For optimize we need to set task_writer_count to 1, otherwise it will create more than one file. + withSingleWriterPerTask(Session.builder(getSession()) .setTimeZoneKey(getTimeZoneKey("Asia/Kathmandu")) - .build(), + .build()), "ALTER TABLE " + tableName + " EXECUTE optimize WHERE CAST(p AS date) >= " + optimizeDate); // Table state shouldn't change substantially (but files may be rewritten) @@ -5141,7 +5146,8 @@ public void testOptimizeTableAfterDeleteWithFormatVersion2() "SELECT summary['total-delete-files'] FROM \"" + tableName + "$snapshots\" WHERE snapshot_id = " + getCurrentSnapshotId(tableName), "VALUES '1'"); - computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + // For optimize we need to set task_writer_count to 1, otherwise it will create more than one file. + computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); List updatedFiles = getActiveFiles(tableName); assertThat(updatedFiles) @@ -5174,7 +5180,8 @@ public void testOptimizeCleansUpDeleteFiles() List allDataFilesAfterDelete = getAllDataFilesFromTableDirectory(tableName); assertThat(allDataFilesAfterDelete).hasSize(6); - computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE regionkey = 4"); + // For optimize we need to set task_writer_count to 1, otherwise it will create more than one file. + computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE regionkey = 4"); computeActual(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')"); computeActual(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')"); @@ -5190,7 +5197,8 @@ public void testOptimizeCleansUpDeleteFiles() assertThat(query("SELECT * FROM " + tableName)) .matches("SELECT * FROM nation WHERE nationkey != 7"); - computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + // For optimize we need to set task_writer_count to 1, otherwise it will create more than one file. + computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); computeActual(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')"); computeActual(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')");