diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java index afbedc0de39c4..4222754a19499 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java @@ -63,6 +63,11 @@ private HoodieWriteConfig getWriteConfig() { .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) .withLockConfig(HoodieLockConfig.newBuilder() .withLockProvider(InProcessLockProvider.class) + .withLockWaitTimeInMillis(50L) + .withNumRetries(2) + .withRetryWaitTimeInMillis(10L) + .withClientNumRetries(2) + .withClientRetryWaitTimeInMillis(10L) .build()) .build(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 458af3ad9e60b..6cc936df9807e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -92,7 +92,9 @@ public class HoodieClientTestUtils { */ public static SparkConf getSparkConfForTest(String appName) { SparkConf sparkConf = new SparkConf().setAppName(appName) - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[8]"); + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[4]") + .set("spark.sql.shuffle.partitions", "4") + .set("spark.default.parallelism", "4"); String evlogDir = System.getProperty("SPARK_EVLOG_DIR"); if (evlogDir != null) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index cb7b2e6b3c43a..4684747161ecb 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -19,12 +19,6 @@ package org.apache.hudi.testutils; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; @@ -59,6 +53,13 @@ import org.apache.hudi.testutils.providers.HoodieWriteClientProvider; import org.apache.hudi.testutils.providers.SparkProvider; import org.apache.hudi.timeline.service.TimelineService; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java index 52413ce938456..81217ce904164 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java @@ -20,13 +20,13 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException; -import org.apache.hudi.utilities.schema.postprocessor.add.AddPrimitiveColumnSchemaPostProcessor; -import org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor; -import org.apache.hudi.utilities.schema.postprocessor.DropColumnSchemaPostProcessor; import org.apache.hudi.utilities.schema.SchemaPostProcessor; import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SparkAvroPostProcessor; +import org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor; +import org.apache.hudi.utilities.schema.postprocessor.DropColumnSchemaPostProcessor; +import org.apache.hudi.utilities.schema.postprocessor.add.AddPrimitiveColumnSchemaPostProcessor; import org.apache.hudi.utilities.schema.postprocessor.add.BaseSchemaPostProcessorConfig; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.hudi.utilities.transform.FlatteningTransformer; @@ -34,6 +34,7 @@ import org.apache.avro.Schema; import org.apache.avro.Schema.Type; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -67,6 +68,11 @@ private static Stream configParams() { return Stream.of(types).map(Arguments::of); } + @BeforeAll + public static void setupOnce() throws Exception { + initTestServices(); + } + @Test public void testPostProcessor() throws IOException { properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, DummySchemaPostProcessor.class.getName()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java index 62aebe3d3ee20..4c8b264fe1685 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java @@ -36,6 +36,7 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.storage.StorageLevel; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -64,6 +65,11 @@ public class TestJdbcSource extends UtilitiesTestBase { private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator(); private static Connection connection; + @BeforeAll + public static void beforeAll() throws Exception { + UtilitiesTestBase.initTestServices(false, false, false); + } + @BeforeEach public void setup() throws Exception { super.setup(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java index 2ac1b8b0bf677..113805d24dbb5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java @@ -39,9 +39,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.streaming.kafka010.KafkaTestUtils; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -57,37 +55,27 @@ public abstract class TestAbstractDebeziumSource extends UtilitiesTestBase { - private static final String TEST_TOPIC_NAME = "hoodie_test"; + private final String testTopicName = "hoodie_test_" + UUID.randomUUID(); private final HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class); - private KafkaTestUtils testUtils; + private static KafkaTestUtils testUtils; @BeforeAll public static void initClass() throws Exception { UtilitiesTestBase.initTestServices(); + testUtils = new KafkaTestUtils(); + testUtils.setup(); } @AfterAll public static void cleanupClass() { UtilitiesTestBase.cleanupClass(); - } - - @BeforeEach - public void setup() throws Exception { - super.setup(); - testUtils = new KafkaTestUtils(); - testUtils.setup(); - } - - @AfterEach - public void teardown() throws Exception { - super.teardown(); testUtils.teardown(); } private TypedProperties createPropsForJsonSource() { TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); + props.setProperty("hoodie.deltastreamer.source.kafka.topic", testTopicName); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); props.setProperty("auto.offset.reset", "earliest"); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); @@ -115,13 +103,13 @@ public void testDebeziumEvents(Operation operation) throws Exception { String sourceClass = getSourceClass(); // topic setup. - testUtils.createTopic(TEST_TOPIC_NAME, 2); + testUtils.createTopic(testTopicName, 2); TypedProperties props = createPropsForJsonSource(); SchemaProvider schemaProvider = new MockSchemaRegistryProvider(props, jsc, this); SourceFormatAdapter debeziumSource = new SourceFormatAdapter(UtilHelpers.createSource(sourceClass, props, jsc, sparkSession, schemaProvider, metrics)); - testUtils.sendMessages(TEST_TOPIC_NAME, new String[] {generateDebeziumEvent(operation).toString()}); + testUtils.sendMessages(testTopicName, new String[] {generateDebeziumEvent(operation).toString()}); InputBatch> fetch = debeziumSource.fetchNewDataInRowFormat(Option.empty(), 10); assertEquals(1, fetch.getBatch().get().count()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java index eff9b24b2b380..c3018bb7baf55 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java @@ -30,8 +30,8 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.streaming.kafka010.KafkaTestUtils; import org.apache.spark.streaming.kafka010.OffsetRange; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import java.util.UUID; @@ -47,18 +47,18 @@ */ public class TestKafkaOffsetGen { - private static String TEST_TOPIC_NAME = "hoodie_test"; - private KafkaTestUtils testUtils; + private final String testTopicName = "hoodie_test_" + UUID.randomUUID(); + private static KafkaTestUtils testUtils; private HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class); - @BeforeEach - public void setup() throws Exception { + @BeforeAll + public static void setup() throws Exception { testUtils = new KafkaTestUtils(); testUtils.setup(); } - @AfterEach - public void teardown() throws Exception { + @AfterAll + public static void teardown() throws Exception { testUtils.teardown(); } @@ -66,7 +66,7 @@ private TypedProperties getConsumerConfigs(String autoOffsetReset, String kafkaC TypedProperties props = new TypedProperties(); props.put("hoodie.deltastreamer.source.kafka.checkpoint.type", kafkaCheckpointType); props.put("auto.offset.reset", autoOffsetReset); - props.put("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); + props.put("hoodie.deltastreamer.source.kafka.topic", testTopicName); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); props.setProperty("key.deserializer", StringDeserializer.class.getName()); props.setProperty("value.deserializer", StringDeserializer.class.getName()); @@ -77,8 +77,8 @@ private TypedProperties getConsumerConfigs(String autoOffsetReset, String kafkaC @Test public void testGetNextOffsetRangesFromEarliest() { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - testUtils.createTopic(TEST_TOPIC_NAME, 1); - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + testUtils.createTopic(testTopicName, 1); + testUtils.sendMessages(testTopicName, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", "string")); OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics); @@ -95,8 +95,8 @@ public void testGetNextOffsetRangesFromEarliest() { @Test public void testGetNextOffsetRangesFromLatest() { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - testUtils.createTopic(TEST_TOPIC_NAME, 1); - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + testUtils.createTopic(testTopicName, 1); + testUtils.sendMessages(testTopicName, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest", "string")); OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics); assertEquals(1, nextOffsetRanges.length); @@ -106,10 +106,10 @@ public void testGetNextOffsetRangesFromLatest() { @Test public void testGetNextOffsetRangesFromCheckpoint() { - String lastCheckpointString = TEST_TOPIC_NAME + ",0:250"; + String lastCheckpointString = testTopicName + ",0:250"; HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - testUtils.createTopic(TEST_TOPIC_NAME, 1); - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + testUtils.createTopic(testTopicName, 1); + testUtils.sendMessages(testTopicName, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest", "string")); OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500, metrics); @@ -121,8 +121,8 @@ public void testGetNextOffsetRangesFromCheckpoint() { @Test public void testGetNextOffsetRangesFromTimestampCheckpointType() { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - testUtils.createTopic(TEST_TOPIC_NAME, 1); - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + testUtils.createTopic(testTopicName, 1); + testUtils.sendMessages(testTopicName, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest", "timestamp")); @@ -135,8 +135,8 @@ public void testGetNextOffsetRangesFromTimestampCheckpointType() { @Test public void testGetNextOffsetRangesFromMultiplePartitions() { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - testUtils.createTopic(TEST_TOPIC_NAME, 2); - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + testUtils.createTopic(testTopicName, 2); + testUtils.sendMessages(testTopicName, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", "string")); OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 499, metrics); assertEquals(2, nextOffsetRanges.length); @@ -149,10 +149,10 @@ public void testGetNextOffsetRangesFromMultiplePartitions() { @Test public void testGetNextOffsetRangesFromGroup() { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - testUtils.createTopic(TEST_TOPIC_NAME, 2); - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + testUtils.createTopic(testTopicName, 2); + testUtils.sendMessages(testTopicName, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group", "string")); - String lastCheckpointString = TEST_TOPIC_NAME + ",0:250,1:249"; + String lastCheckpointString = testTopicName + ",0:250,1:249"; kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString); // don't pass lastCheckpointString as we want to read from group committed offset OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics); @@ -174,7 +174,7 @@ public void testGetNextOffsetRangesFromGroup() { public void testCheckTopicExists() { TypedProperties props = getConsumerConfigs("latest", "string"); KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props); - testUtils.createTopic(TEST_TOPIC_NAME, 1); + testUtils.createTopic(testTopicName, 1); boolean topicExists = kafkaOffsetGen.checkTopicExists(new KafkaConsumer(props)); assertTrue(topicExists); props.put("hoodie.deltastreamer.source.kafka.topic", "random"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index 534e14cc73585..cc61a35886df7 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -114,10 +114,10 @@ public class UtilitiesTestBase { protected static ZookeeperTestService zookeeperTestService; private static final ObjectMapper MAPPER = new ObjectMapper(); - protected transient JavaSparkContext jsc; - protected transient HoodieSparkEngineContext context; - protected transient SparkSession sparkSession; - protected transient SQLContext sqlContext; + protected static JavaSparkContext jsc; + protected static HoodieSparkEngineContext context; + protected static SparkSession sparkSession; + protected static SQLContext sqlContext; @BeforeAll public static void setLogLevel() { @@ -155,6 +155,11 @@ public static void initTestServices(boolean needsHdfs, boolean needsHive, boolea zookeeperTestService = new ZookeeperTestService(hadoopConf); zookeeperTestService.start(); } + + jsc = UtilHelpers.buildSparkContext(UtilitiesTestBase.class.getName() + "-hoodie", "local[4]"); + context = new HoodieSparkEngineContext(jsc); + sqlContext = new SQLContext(jsc); + sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate(); } @AfterAll @@ -175,20 +180,6 @@ public static void cleanupClass() { zookeeperTestService.stop(); zookeeperTestService = null; } - } - - @BeforeEach - public void setup() throws Exception { - TestDataSource.initDataGen(); - jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]"); - context = new HoodieSparkEngineContext(jsc); - sqlContext = new SQLContext(jsc); - sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate(); - } - - @AfterEach - public void teardown() throws Exception { - TestDataSource.resetDataGen(); if (jsc != null) { jsc.stop(); jsc = null; @@ -202,6 +193,16 @@ public void teardown() throws Exception { } } + @BeforeEach + public void setup() throws Exception { + TestDataSource.initDataGen(); + } + + @AfterEach + public void teardown() throws Exception { + TestDataSource.resetDataGen(); + } + /** * Helper to get hive sync config. * diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java index d49d197fd57f5..e7843221e0b1e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java @@ -37,8 +37,6 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -65,22 +63,12 @@ public static void initClass() throws Exception { UtilitiesTestBase.initTestServices(true, false, false); } - @AfterAll - public static void cleanupClass() { - UtilitiesTestBase.cleanupClass(); - } - @BeforeEach public void setup() throws Exception { super.setup(); schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc); } - @AfterEach - public void teardown() throws Exception { - super.teardown(); - } - /** * Prepares the specific {@link Source} to test, by passing in necessary configurations. *