diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index 193c0abcd8d62..6ab4b1b6e0d48 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -29,6 +29,7 @@ import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.HoodiePipeline; import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.FlinkMiniCluster; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; import org.apache.hudi.utils.TestUtils; @@ -54,6 +55,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.TestLogger; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -71,6 +73,7 @@ /** * Integration test for Flink Hoodie stream sink. */ +@ExtendWith(FlinkMiniCluster.class) public class ITTestDataStreamWrite extends TestLogger { private static final Map> EXPECTED = new HashMap<>(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java index f50f5748be702..4c0fe82e44bf8 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java @@ -39,6 +39,7 @@ import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.FlinkMiniCluster; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; import org.apache.hudi.utils.TestSQL; @@ -56,6 +57,7 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import java.io.File; @@ -69,6 +71,7 @@ /** * IT cases for {@link HoodieFlinkClusteringJob}. */ +@ExtendWith(FlinkMiniCluster.class) public class ITTestHoodieFlinkClustering { private static final Map EXPECTED = new HashMap<>(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index 05d9d198541ab..ee9285d60a75d 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -30,6 +30,7 @@ import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.FlinkMiniCluster; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; import org.apache.hudi.utils.TestSQL; @@ -43,6 +44,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -63,6 +65,7 @@ /** * IT cases for {@link org.apache.hudi.common.model.HoodieRecord}. */ +@ExtendWith(FlinkMiniCluster.class) public class ITTestHoodieFlinkCompactor { protected static final Logger LOG = LoggerFactory.getLogger(ITTestHoodieFlinkCompactor.class); @@ -155,7 +158,7 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception { .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new ProcessOperator<>(new CompactFunction(conf))) - .setParallelism(compactionPlan.getOperations().size()) + .setParallelism(FlinkMiniCluster.DEFAULT_PARALLELISM) .addSink(new CompactionCommitSink(conf)) .name("clean_commits") .uid("uid_clean_commits") @@ -195,6 +198,7 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce cfg.schedule = true; Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg); conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); + conf.setInteger(FlinkOptions.COMPACTION_TASKS.key(), FlinkMiniCluster.DEFAULT_PARALLELISM); HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env); asyncCompactionService.start(null); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index ebdcb9839f723..1d1e7b3fc6538 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -26,6 +26,7 @@ import org.apache.hudi.table.catalog.HoodieCatalogTestUtils; import org.apache.hudi.table.catalog.HoodieHiveCatalog; import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.FlinkMiniCluster; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; import org.apache.hudi.utils.TestSQL; @@ -43,11 +44,11 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.data.RowData; -import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.types.Row; import org.apache.flink.util.CollectionUtil; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -76,7 +77,8 @@ /** * IT cases for Hoodie table source and sink. */ -public class ITTestHoodieDataSource extends AbstractTestBase { +@ExtendWith(FlinkMiniCluster.class) +public class ITTestHoodieDataSource { private TableEnvironment streamTableEnv; private TableEnvironment batchTableEnv; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java new file mode 100644 index 0000000000000..96d07cd6565bd --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.test.util.MiniClusterWithClientResource; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class for tests that run multiple tests and want to reuse the same Flink cluster. + * Unlike {@link AbstractTestBase}, this class is designed to run with JUnit 5. + */ +public class FlinkMiniCluster implements BeforeAllCallback, AfterAllCallback, AfterEachCallback { + private static final Logger LOG = LoggerFactory.getLogger(FlinkMiniCluster.class); + + public static final int DEFAULT_PARALLELISM = 4; + + private static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) + .build()); + + @Override + public void beforeAll(ExtensionContext context) throws Exception { + MINI_CLUSTER_RESOURCE.before(); + } + + @Override + public void afterAll(ExtensionContext context) { + MINI_CLUSTER_RESOURCE.after(); + } + + @Override + public void afterEach(ExtensionContext context) throws Exception { + cleanupRunningJobs(); + } + + private void cleanupRunningJobs() throws Exception { + if (!MINI_CLUSTER_RESOURCE.getMiniCluster().isRunning()) { + // do nothing if the MiniCluster is not running + LOG.warn("Mini cluster is not running after the test!"); + return; + } + + for (JobStatusMessage path : MINI_CLUSTER_RESOURCE.getClusterClient().listJobs().get()) { + if (!path.getJobState().isTerminalState()) { + try { + MINI_CLUSTER_RESOURCE.getClusterClient().cancel(path.getJobId()).get(); + } catch (Exception ignored) { + // ignore exceptions when cancelling dangling jobs + } + } + } + } +}