Skip to content

Commit

Permalink
[HUDI-4085] Fixing flakiness with parquet empty batch tests in TestHo…
Browse files Browse the repository at this point in the history
…odieDeltaStreamer (apache#5559)
  • Loading branch information
nsivabalan authored and cdmikechen committed May 13, 2022
1 parent cb1ce77 commit 606ce3b
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -192,7 +192,7 @@ protected static void writeCommonPropsToFile(FileSystem dfs, String dfsBasePath)
@BeforeEach
public void setup() throws Exception {
super.setup();
TestParquetDFSSourceEmptyBatch.returnEmptyBatch = false;
TestDataSource.returnEmptyBatch = false;
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1509,9 +1509,13 @@ private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic
testUtils.sendMessages(topicName, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", numRecords, HoodieTestDataGenerator.TRIP_SCHEMA)));
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String emptyBatchParam) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc",
PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false);
PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path", emptyBatchParam);
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "");
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
Expand All @@ -1520,9 +1524,15 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans
"partition_path");
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String parquetSourceRoot, boolean addCommonProps, String partitionPath) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps,
partitionPath, "");
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String parquetSourceRoot, boolean addCommonProps,
String partitionPath) throws IOException {
String partitionPath, String emptyBatchParam) throws IOException {
// Properties used for testing delta-streamer with Parquet source
TypedProperties parquetProps = new TypedProperties();

Expand All @@ -1541,6 +1551,9 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans
}
}
parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot);
if (!StringUtils.isNullOrEmpty(emptyBatchParam)) {
parquetProps.setProperty(TestParquetDFSSourceEmptyBatch.RETURN_EMPTY_BATCH, emptyBatchParam);
}
UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + propsFileName);
}

Expand All @@ -1549,7 +1562,7 @@ private void testParquetDFSSource(boolean useSchemaProvider, List<String> transf
}

private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch) throws Exception {
prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null);
prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null, testEmptyBatch ? "1" : "");
String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName()
Expand All @@ -1563,7 +1576,6 @@ private void testParquetDFSSource(boolean useSchemaProvider, List<String> transf
if (testEmptyBatch) {
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
// parquet source to return empty batch
TestParquetDFSSourceEmptyBatch.returnEmptyBatch = true;
deltaStreamer.sync();
// since we mimic'ed empty batch, total records should be same as first sync().
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;

Expand All @@ -29,19 +30,28 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class TestParquetDFSSourceEmptyBatch extends ParquetDFSSource {

public static boolean returnEmptyBatch;
public static String RETURN_EMPTY_BATCH = "test.dfs.source.return.empty.batches.for";
public static String DEFAULT_RETURN_EMPTY_BATCH = "";
public List<Integer> emptyBatches;
private int counter = 0;

public TestParquetDFSSourceEmptyBatch(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
String[] emptyBatchesStr = props.getString(RETURN_EMPTY_BATCH, DEFAULT_RETURN_EMPTY_BATCH).split(",");
this.emptyBatches = Arrays.stream(emptyBatchesStr).filter(entry -> !StringUtils.isNullOrEmpty(entry)).map(entry -> Integer.parseInt(entry)).collect(Collectors.toList());
}

@Override
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
Pair<Option<Dataset<Row>>, String> toReturn = super.fetchNextBatch(lastCkptStr, sourceLimit);
if (returnEmptyBatch) {
if (emptyBatches.contains(counter++)) {
return Pair.of(Option.empty(), toReturn.getRight());
}
return toReturn;
Expand Down

0 comments on commit 606ce3b

Please sign in to comment.