Skip to content

Commit

Permalink
[MINOR] Performance improvement of flink ITs with reused miniCluster (a…
Browse files Browse the repository at this point in the history
…pache#7151)

* implement MiniCluster extension compatible with junit5
  • Loading branch information
trushev authored and Alexey Kudinkin committed Dec 14, 2022
1 parent 2c18077 commit b57adf9
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.HoodiePipeline;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
Expand All @@ -54,6 +55,7 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.TestLogger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -71,6 +73,7 @@
/**
* Integration test for Flink Hoodie stream sink.
*/
@ExtendWith(FlinkMiniCluster.class)
public class ITTestDataStreamWrite extends TestLogger {

private static final Map<String, List<String>> EXPECTED = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
Expand All @@ -56,6 +57,7 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
Expand All @@ -69,6 +71,7 @@
/**
* IT cases for {@link HoodieFlinkClusteringJob}.
*/
@ExtendWith(FlinkMiniCluster.class)
public class ITTestHoodieFlinkClustering {

private static final Map<String, String> EXPECTED = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
Expand All @@ -43,6 +44,7 @@
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -63,6 +65,7 @@
/**
* IT cases for {@link org.apache.hudi.common.model.HoodieRecord}.
*/
@ExtendWith(FlinkMiniCluster.class)
public class ITTestHoodieFlinkCompactor {

protected static final Logger LOG = LoggerFactory.getLogger(ITTestHoodieFlinkCompactor.class);
Expand Down Expand Up @@ -155,7 +158,7 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception {
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(compactionPlan.getOperations().size())
.setParallelism(FlinkMiniCluster.DEFAULT_PARALLELISM)
.addSink(new CompactionCommitSink(conf))
.name("clean_commits")
.uid("uid_clean_commits")
Expand Down Expand Up @@ -195,6 +198,7 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce
cfg.schedule = true;
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
conf.setInteger(FlinkOptions.COMPACTION_TASKS.key(), FlinkMiniCluster.DEFAULT_PARALLELISM);

HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env);
asyncCompactionService.start(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
import org.apache.hudi.table.catalog.HoodieHiveCatalog;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
Expand All @@ -43,11 +44,11 @@
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -76,7 +77,8 @@
/**
* IT cases for Hoodie table source and sink.
*/
public class ITTestHoodieDataSource extends AbstractTestBase {
@ExtendWith(FlinkMiniCluster.class)
public class ITTestHoodieDataSource {
private TableEnvironment streamTableEnv;
private TableEnvironment batchTableEnv;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utils;

import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.MiniClusterWithClientResource;

import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class for tests that run multiple tests and want to reuse the same Flink cluster.
* Unlike {@link AbstractTestBase}, this class is designed to run with JUnit 5.
*/
public class FlinkMiniCluster implements BeforeAllCallback, AfterAllCallback, AfterEachCallback {
private static final Logger LOG = LoggerFactory.getLogger(FlinkMiniCluster.class);

public static final int DEFAULT_PARALLELISM = 4;

private static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.build());

@Override
public void beforeAll(ExtensionContext context) throws Exception {
MINI_CLUSTER_RESOURCE.before();
}

@Override
public void afterAll(ExtensionContext context) {
MINI_CLUSTER_RESOURCE.after();
}

@Override
public void afterEach(ExtensionContext context) throws Exception {
cleanupRunningJobs();
}

private void cleanupRunningJobs() throws Exception {
if (!MINI_CLUSTER_RESOURCE.getMiniCluster().isRunning()) {
// do nothing if the MiniCluster is not running
LOG.warn("Mini cluster is not running after the test!");
return;
}

for (JobStatusMessage path : MINI_CLUSTER_RESOURCE.getClusterClient().listJobs().get()) {
if (!path.getJobState().isTerminalState()) {
try {
MINI_CLUSTER_RESOURCE.getClusterClient().cancel(path.getJobId()).get();
} catch (Exception ignored) {
// ignore exceptions when cancelling dangling jobs
}
}
}
}
}

0 comments on commit b57adf9

Please sign in to comment.