Skip to content

Commit

Permalink
[HUDI-4686] Flip option 'write.ignore.failed' to default false (#6467)
Browse files Browse the repository at this point in the history
Also fix the flaky test
  • Loading branch information
danny0405 authored Aug 23, 2022
1 parent a9982d1 commit 1879efa
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private FlinkOptions() {

public static final String NO_PRE_COMBINE = "no_precombine";
public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
.key("payload.ordering.field")
.key("precombine.field")
.stringType()
.defaultValue("ts")
.withFallbackKeys("write.precombine.field")
Expand Down Expand Up @@ -330,9 +330,9 @@ private FlinkOptions() {
public static final ConfigOption<Boolean> IGNORE_FAILED = ConfigOptions
.key("write.ignore.failed")
.booleanType()
.defaultValue(true)
.defaultValue(false)
.withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch.\n"
+ "By default true (in favor of streaming progressing over data integrity)");
+ "By default false");

public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
.key(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,11 @@ public void testWriteMergeOnReadWithCompaction(String indexType) throws Exceptio
}

@Test
public void testWriteMergeOnReadWithClustering() throws Exception {
public void testWriteCopyOnWriteWithClustering() throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.OPERATION, "insert");
conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.COPY_ON_WRITE.name());

testWriteToHoodieWithCluster(conf, "cow_write_with_cluster", 1, EXPECTED);
}
Expand Down Expand Up @@ -281,36 +280,20 @@ private void testWriteToHoodieWithCluster(
String sourcePath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_source.data")).toString();

boolean isMor = conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name());

DataStream<RowData> dataStream;
if (isMor) {
TextInputFormat format = new TextInputFormat(new Path(sourcePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName("UTF-8");

dataStream = execEnv
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(1);
} else {
dataStream = execEnv
// use continuous file source to trigger checkpoint
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), checkpoints))
.name("continuous_file_source")
.setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4);
}
DataStream<RowData> dataStream = execEnv
// use continuous file source to trigger checkpoint
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), checkpoints))
.name("continuous_file_source")
.setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4);

OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, true);
execEnv.addOperator(pipeline.getTransformation());

Pipelines.cluster(conf, rowType, pipeline);
execEnv.execute(jobName);
execute(execEnv, false, jobName);

TestData.checkWrittenDataCOW(tempFile, expected);
}
Expand Down Expand Up @@ -384,8 +367,6 @@ public void testHoodiePipelineBuilderSink() throws Exception {
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
Configuration conf = Configuration.fromMap(options);
// Read from file source
Expand All @@ -405,16 +386,15 @@ public void testHoodiePipelineBuilderSink() throws Exception {

TextInputFormat format = new TextInputFormat(new Path(sourcePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName("UTF-8");

DataStream dataStream = execEnv
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
// use continuous file source to trigger checkpoint
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2))
.name("continuous_file_source")
.setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(1);


.setParallelism(4);

//sink to hoodie table use low-level sink api.
HoodiePipeline.Builder builder = HoodiePipeline.builder("test_sink")
Expand All @@ -429,7 +409,7 @@ public void testHoodiePipelineBuilderSink() throws Exception {

builder.sink(dataStream, false);

execute(execEnv, true, "Api_Sink_Test");
execute(execEnv, false, "Api_Sink_Test");
TestData.checkWrittenDataCOW(tempFile, EXPECTED);
}
}

0 comments on commit 1879efa

Please sign in to comment.