diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java index 4ac6f73d880fb..1a1cf39dbfef6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java @@ -30,7 +30,7 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; -import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch; +import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.avro.Schema; @@ -192,7 +192,7 @@ protected static void writeCommonPropsToFile(FileSystem dfs, String dfsBasePath) @BeforeEach public void setup() throws Exception { super.setup(); - TestParquetDFSSourceEmptyBatch.returnEmptyBatch = false; + TestDataSource.returnEmptyBatch = false; } @AfterAll diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 3eaec56cc2764..2707e8392cb33 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -1509,9 +1509,13 @@ private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic testUtils.sendMessages(topicName, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", numRecords, HoodieTestDataGenerator.TRIP_SCHEMA))); } - private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException { + private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String emptyBatchParam) throws IOException { prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", - PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false); + PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path", emptyBatchParam); + } + + private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException { + prepareParquetDFSSource(useSchemaProvider, hasTransformer, ""); } private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, @@ -1520,9 +1524,15 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans "partition_path"); } + private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, + String propsFileName, String parquetSourceRoot, boolean addCommonProps, String partitionPath) throws IOException { + prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps, + partitionPath, ""); + } + private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, String propsFileName, String parquetSourceRoot, boolean addCommonProps, - String partitionPath) throws IOException { + String partitionPath, String emptyBatchParam) throws IOException { // Properties used for testing delta-streamer with Parquet source TypedProperties parquetProps = new TypedProperties(); @@ -1541,6 +1551,9 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans } } parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot); + if (!StringUtils.isNullOrEmpty(emptyBatchParam)) { + parquetProps.setProperty(TestParquetDFSSourceEmptyBatch.RETURN_EMPTY_BATCH, emptyBatchParam); + } UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + propsFileName); } @@ -1549,7 +1562,7 @@ private void testParquetDFSSource(boolean useSchemaProvider, List transf } private void testParquetDFSSource(boolean useSchemaProvider, List transformerClassNames, boolean testEmptyBatch) throws Exception { - prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null); + prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null, testEmptyBatch ? "1" : ""); String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName() @@ -1563,7 +1576,6 @@ private void testParquetDFSSource(boolean useSchemaProvider, List transf if (testEmptyBatch) { prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null); // parquet source to return empty batch - TestParquetDFSSourceEmptyBatch.returnEmptyBatch = true; deltaStreamer.sync(); // since we mimic'ed empty batch, total records should be same as first sync(). TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSourceEmptyBatch.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSourceEmptyBatch.java index 3129e91a9d3e0..11c3f4c8f95ee 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSourceEmptyBatch.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSourceEmptyBatch.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -29,19 +30,28 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + public class TestParquetDFSSourceEmptyBatch extends ParquetDFSSource { - public static boolean returnEmptyBatch; + public static String RETURN_EMPTY_BATCH = "test.dfs.source.return.empty.batches.for"; + public static String DEFAULT_RETURN_EMPTY_BATCH = ""; + public List emptyBatches; + private int counter = 0; public TestParquetDFSSourceEmptyBatch(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); + String[] emptyBatchesStr = props.getString(RETURN_EMPTY_BATCH, DEFAULT_RETURN_EMPTY_BATCH).split(","); + this.emptyBatches = Arrays.stream(emptyBatchesStr).filter(entry -> !StringUtils.isNullOrEmpty(entry)).map(entry -> Integer.parseInt(entry)).collect(Collectors.toList()); } @Override public Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { Pair>, String> toReturn = super.fetchNextBatch(lastCkptStr, sourceLimit); - if (returnEmptyBatch) { + if (emptyBatches.contains(counter++)) { return Pair.of(Option.empty(), toReturn.getRight()); } return toReturn;