Skip to content

Commit

Permalink
[HUDI-5198] Reduce test run time in hudi-utilities and locking relate…
Browse files Browse the repository at this point in the history
…d tests (apache#7180)
  • Loading branch information
the-other-tim-brown authored and fengjian committed Apr 5, 2023
1 parent a6f9516 commit 24f9780
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ private HoodieWriteConfig getWriteConfig() {
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder()
.withLockProvider(InProcessLockProvider.class)
.withLockWaitTimeInMillis(50L)
.withNumRetries(2)
.withRetryWaitTimeInMillis(10L)
.withClientNumRetries(2)
.withClientRetryWaitTimeInMillis(10L)
.build())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ public class HoodieClientTestUtils {
*/
public static SparkConf getSparkConfForTest(String appName) {
SparkConf sparkConf = new SparkConf().setAppName(appName)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[8]");
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[4]")
.set("spark.sql.shuffle.partitions", "4")
.set("spark.default.parallelism", "4");

String evlogDir = System.getProperty("SPARK_EVLOG_DIR");
if (evlogDir != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@

package org.apache.hudi.testutils;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
Expand Down Expand Up @@ -59,6 +53,13 @@
import org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hudi.timeline.service.TimelineService;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.hudi.utilities.schema.postprocessor.add.AddPrimitiveColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.DropColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.DropColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.add.AddPrimitiveColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.add.BaseSchemaPostProcessorConfig;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.transform.FlatteningTransformer;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -67,6 +68,11 @@ private static Stream<Arguments> configParams() {
return Stream.of(types).map(Arguments::of);
}

@BeforeAll
public static void setupOnce() throws Exception {
initTestServices();
}

@Test
public void testPostProcessor() throws IOException {
properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, DummySchemaPostProcessor.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.storage.StorageLevel;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -64,6 +65,11 @@ public class TestJdbcSource extends UtilitiesTestBase {
private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator();
private static Connection connection;

@BeforeAll
public static void beforeAll() throws Exception {
UtilitiesTestBase.initTestServices(false, false, false);
}

@BeforeEach
public void setup() throws Exception {
super.setup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -57,37 +55,27 @@

public abstract class TestAbstractDebeziumSource extends UtilitiesTestBase {

private static final String TEST_TOPIC_NAME = "hoodie_test";
private final String testTopicName = "hoodie_test_" + UUID.randomUUID();

private final HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class);
private KafkaTestUtils testUtils;
private static KafkaTestUtils testUtils;

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices();
testUtils = new KafkaTestUtils();
testUtils.setup();
}

@AfterAll
public static void cleanupClass() {
UtilitiesTestBase.cleanupClass();
}

@BeforeEach
public void setup() throws Exception {
super.setup();
testUtils = new KafkaTestUtils();
testUtils.setup();
}

@AfterEach
public void teardown() throws Exception {
super.teardown();
testUtils.teardown();
}

private TypedProperties createPropsForJsonSource() {
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
props.setProperty("hoodie.deltastreamer.source.kafka.topic", testTopicName);
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", "earliest");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Expand Down Expand Up @@ -115,13 +103,13 @@ public void testDebeziumEvents(Operation operation) throws Exception {
String sourceClass = getSourceClass();

// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
testUtils.createTopic(testTopicName, 2);
TypedProperties props = createPropsForJsonSource();

SchemaProvider schemaProvider = new MockSchemaRegistryProvider(props, jsc, this);
SourceFormatAdapter debeziumSource = new SourceFormatAdapter(UtilHelpers.createSource(sourceClass, props, jsc, sparkSession, schemaProvider, metrics));

testUtils.sendMessages(TEST_TOPIC_NAME, new String[] {generateDebeziumEvent(operation).toString()});
testUtils.sendMessages(testTopicName, new String[] {generateDebeziumEvent(operation).toString()});

InputBatch<Dataset<Row>> fetch = debeziumSource.fetchNewDataInRowFormat(Option.empty(), 10);
assertEquals(1, fetch.getBatch().get().count());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.UUID;
Expand All @@ -47,26 +47,26 @@
*/
public class TestKafkaOffsetGen {

private static String TEST_TOPIC_NAME = "hoodie_test";
private KafkaTestUtils testUtils;
private final String testTopicName = "hoodie_test_" + UUID.randomUUID();
private static KafkaTestUtils testUtils;
private HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class);

@BeforeEach
public void setup() throws Exception {
@BeforeAll
public static void setup() throws Exception {
testUtils = new KafkaTestUtils();
testUtils.setup();
}

@AfterEach
public void teardown() throws Exception {
@AfterAll
public static void teardown() throws Exception {
testUtils.teardown();
}

private TypedProperties getConsumerConfigs(String autoOffsetReset, String kafkaCheckpointType) {
TypedProperties props = new TypedProperties();
props.put("hoodie.deltastreamer.source.kafka.checkpoint.type", kafkaCheckpointType);
props.put("auto.offset.reset", autoOffsetReset);
props.put("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
props.put("hoodie.deltastreamer.source.kafka.topic", testTopicName);
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", StringDeserializer.class.getName());
Expand All @@ -77,8 +77,8 @@ private TypedProperties getConsumerConfigs(String autoOffsetReset, String kafkaC
@Test
public void testGetNextOffsetRangesFromEarliest() {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(TEST_TOPIC_NAME, 1);
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
testUtils.createTopic(testTopicName, 1);
testUtils.sendMessages(testTopicName, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));

KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", "string"));
OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
Expand All @@ -95,8 +95,8 @@ public void testGetNextOffsetRangesFromEarliest() {
@Test
public void testGetNextOffsetRangesFromLatest() {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(TEST_TOPIC_NAME, 1);
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
testUtils.createTopic(testTopicName, 1);
testUtils.sendMessages(testTopicName, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest", "string"));
OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
assertEquals(1, nextOffsetRanges.length);
Expand All @@ -106,10 +106,10 @@ public void testGetNextOffsetRangesFromLatest() {

@Test
public void testGetNextOffsetRangesFromCheckpoint() {
String lastCheckpointString = TEST_TOPIC_NAME + ",0:250";
String lastCheckpointString = testTopicName + ",0:250";
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(TEST_TOPIC_NAME, 1);
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
testUtils.createTopic(testTopicName, 1);
testUtils.sendMessages(testTopicName, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest", "string"));

OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500, metrics);
Expand All @@ -121,8 +121,8 @@ public void testGetNextOffsetRangesFromCheckpoint() {
@Test
public void testGetNextOffsetRangesFromTimestampCheckpointType() {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(TEST_TOPIC_NAME, 1);
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
testUtils.createTopic(testTopicName, 1);
testUtils.sendMessages(testTopicName, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));

KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest", "timestamp"));

Expand All @@ -135,8 +135,8 @@ public void testGetNextOffsetRangesFromTimestampCheckpointType() {
@Test
public void testGetNextOffsetRangesFromMultiplePartitions() {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(TEST_TOPIC_NAME, 2);
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
testUtils.createTopic(testTopicName, 2);
testUtils.sendMessages(testTopicName, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", "string"));
OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 499, metrics);
assertEquals(2, nextOffsetRanges.length);
Expand All @@ -149,10 +149,10 @@ public void testGetNextOffsetRangesFromMultiplePartitions() {
@Test
public void testGetNextOffsetRangesFromGroup() {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(TEST_TOPIC_NAME, 2);
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
testUtils.createTopic(testTopicName, 2);
testUtils.sendMessages(testTopicName, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group", "string"));
String lastCheckpointString = TEST_TOPIC_NAME + ",0:250,1:249";
String lastCheckpointString = testTopicName + ",0:250,1:249";
kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
// don't pass lastCheckpointString as we want to read from group committed offset
OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics);
Expand All @@ -174,7 +174,7 @@ public void testGetNextOffsetRangesFromGroup() {
public void testCheckTopicExists() {
TypedProperties props = getConsumerConfigs("latest", "string");
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
testUtils.createTopic(TEST_TOPIC_NAME, 1);
testUtils.createTopic(testTopicName, 1);
boolean topicExists = kafkaOffsetGen.checkTopicExists(new KafkaConsumer(props));
assertTrue(topicExists);
props.put("hoodie.deltastreamer.source.kafka.topic", "random");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ public class UtilitiesTestBase {
protected static ZookeeperTestService zookeeperTestService;
private static final ObjectMapper MAPPER = new ObjectMapper();

protected transient JavaSparkContext jsc;
protected transient HoodieSparkEngineContext context;
protected transient SparkSession sparkSession;
protected transient SQLContext sqlContext;
protected static JavaSparkContext jsc;
protected static HoodieSparkEngineContext context;
protected static SparkSession sparkSession;
protected static SQLContext sqlContext;

@BeforeAll
public static void setLogLevel() {
Expand Down Expand Up @@ -155,6 +155,11 @@ public static void initTestServices(boolean needsHdfs, boolean needsHive, boolea
zookeeperTestService = new ZookeeperTestService(hadoopConf);
zookeeperTestService.start();
}

jsc = UtilHelpers.buildSparkContext(UtilitiesTestBase.class.getName() + "-hoodie", "local[4]");
context = new HoodieSparkEngineContext(jsc);
sqlContext = new SQLContext(jsc);
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
}

@AfterAll
Expand All @@ -175,20 +180,6 @@ public static void cleanupClass() {
zookeeperTestService.stop();
zookeeperTestService = null;
}
}

@BeforeEach
public void setup() throws Exception {
TestDataSource.initDataGen();
jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]");
context = new HoodieSparkEngineContext(jsc);
sqlContext = new SQLContext(jsc);
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
}

@AfterEach
public void teardown() throws Exception {
TestDataSource.resetDataGen();
if (jsc != null) {
jsc.stop();
jsc = null;
Expand All @@ -202,6 +193,16 @@ public void teardown() throws Exception {
}
}

@BeforeEach
public void setup() throws Exception {
TestDataSource.initDataGen();
}

@AfterEach
public void teardown() throws Exception {
TestDataSource.resetDataGen();
}

/**
* Helper to get hive sync config.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -65,22 +63,12 @@ public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(true, false, false);
}

@AfterAll
public static void cleanupClass() {
UtilitiesTestBase.cleanupClass();
}

@BeforeEach
public void setup() throws Exception {
super.setup();
schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
}

@AfterEach
public void teardown() throws Exception {
super.teardown();
}

/**
* Prepares the specific {@link Source} to test, by passing in necessary configurations.
*
Expand Down

0 comments on commit 24f9780

Please sign in to comment.