Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4085] Fixing flakiness with parquet empty batch tests in TestHoodieDeltaStreamer #5559

Merged
merged 1 commit into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -192,7 +192,7 @@ protected static void writeCommonPropsToFile(FileSystem dfs, String dfsBasePath)
@BeforeEach
public void setup() throws Exception {
super.setup();
TestParquetDFSSourceEmptyBatch.returnEmptyBatch = false;
TestDataSource.returnEmptyBatch = false;
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1509,9 +1509,13 @@ private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic
testUtils.sendMessages(topicName, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", numRecords, HoodieTestDataGenerator.TRIP_SCHEMA)));
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String emptyBatchParam) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc",
PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false);
PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path", emptyBatchParam);
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "");
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
Expand All @@ -1520,9 +1524,15 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans
"partition_path");
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String parquetSourceRoot, boolean addCommonProps, String partitionPath) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps,
partitionPath, "");
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String parquetSourceRoot, boolean addCommonProps,
String partitionPath) throws IOException {
String partitionPath, String emptyBatchParam) throws IOException {
// Properties used for testing delta-streamer with Parquet source
TypedProperties parquetProps = new TypedProperties();

Expand All @@ -1541,6 +1551,9 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans
}
}
parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot);
if (!StringUtils.isNullOrEmpty(emptyBatchParam)) {
parquetProps.setProperty(TestParquetDFSSourceEmptyBatch.RETURN_EMPTY_BATCH, emptyBatchParam);
}
UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + propsFileName);
}

Expand All @@ -1549,7 +1562,7 @@ private void testParquetDFSSource(boolean useSchemaProvider, List<String> transf
}

private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch) throws Exception {
prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null);
prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null, testEmptyBatch ? "1" : "");
String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName()
Expand All @@ -1563,7 +1576,6 @@ private void testParquetDFSSource(boolean useSchemaProvider, List<String> transf
if (testEmptyBatch) {
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
// parquet source to return empty batch
TestParquetDFSSourceEmptyBatch.returnEmptyBatch = true;
deltaStreamer.sync();
// since we mimic'ed empty batch, total records should be same as first sync().
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;

Expand All @@ -29,19 +30,28 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class TestParquetDFSSourceEmptyBatch extends ParquetDFSSource {

public static boolean returnEmptyBatch;
public static String RETURN_EMPTY_BATCH = "test.dfs.source.return.empty.batches.for";
public static String DEFAULT_RETURN_EMPTY_BATCH = "";
public List<Integer> emptyBatches;
private int counter = 0;

public TestParquetDFSSourceEmptyBatch(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
String[] emptyBatchesStr = props.getString(RETURN_EMPTY_BATCH, DEFAULT_RETURN_EMPTY_BATCH).split(",");
this.emptyBatches = Arrays.stream(emptyBatchesStr).filter(entry -> !StringUtils.isNullOrEmpty(entry)).map(entry -> Integer.parseInt(entry)).collect(Collectors.toList());
}

@Override
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
Pair<Option<Dataset<Row>>, String> toReturn = super.fetchNextBatch(lastCkptStr, sourceLimit);
if (returnEmptyBatch) {
if (emptyBatches.contains(counter++)) {
return Pair.of(Option.empty(), toReturn.getRight());
}
return toReturn;
Expand Down