From 1879efa45d556c87d2cda56fa16d5de535481c06 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Tue, 23 Aug 2022 18:54:57 +0800 Subject: [PATCH] [HUDI-4686] Flip option 'write.ignore.failed' to default false (#6467) Also fix the flaky test --- .../hudi/configuration/FlinkOptions.java | 6 +-- .../hudi/sink/ITTestDataStreamWrite.java | 50 ++++++------------- 2 files changed, 18 insertions(+), 38 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 65c27084bf408..5c8a89380dc31 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -99,7 +99,7 @@ private FlinkOptions() { public static final String NO_PRE_COMBINE = "no_precombine"; public static final ConfigOption PRECOMBINE_FIELD = ConfigOptions - .key("payload.ordering.field") + .key("precombine.field") .stringType() .defaultValue("ts") .withFallbackKeys("write.precombine.field") @@ -330,9 +330,9 @@ private FlinkOptions() { public static final ConfigOption IGNORE_FAILED = ConfigOptions .key("write.ignore.failed") .booleanType() - .defaultValue(true) + .defaultValue(false) .withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch.\n" - + "By default true (in favor of streaming progressing over data integrity)"); + + "By default false"); public static final ConfigOption RECORD_KEY_FIELD = ConfigOptions .key(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index 4862cda07abf6..18b2af3efe67b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -157,12 +157,11 @@ public void testWriteMergeOnReadWithCompaction(String indexType) throws Exceptio } @Test - public void testWriteMergeOnReadWithClustering() throws Exception { + public void testWriteCopyOnWriteWithClustering() throws Exception { Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true); conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1); conf.setString(FlinkOptions.OPERATION, "insert"); - conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.COPY_ON_WRITE.name()); testWriteToHoodieWithCluster(conf, "cow_write_with_cluster", 1, EXPECTED); } @@ -281,36 +280,20 @@ private void testWriteToHoodieWithCluster( String sourcePath = Objects.requireNonNull(Thread.currentThread() .getContextClassLoader().getResource("test_source.data")).toString(); - boolean isMor = conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name()); - - DataStream dataStream; - if (isMor) { - TextInputFormat format = new TextInputFormat(new Path(sourcePath)); - format.setFilesFilter(FilePathFilter.createDefaultFilter()); - TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; - format.setCharsetName("UTF-8"); - - dataStream = execEnv - // use PROCESS_CONTINUOUSLY mode to trigger checkpoint - .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) - .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) - .setParallelism(1); - } else { - dataStream = execEnv - // use continuous file source to trigger checkpoint - .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), checkpoints)) - .name("continuous_file_source") - .setParallelism(1) - .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) - .setParallelism(4); - } + DataStream dataStream = execEnv + // use continuous file source to trigger checkpoint + .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), checkpoints)) + .name("continuous_file_source") + .setParallelism(1) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(4); OptionsInference.setupSinkTasks(conf, execEnv.getParallelism()); DataStream pipeline = Pipelines.append(conf, rowType, dataStream, true); execEnv.addOperator(pipeline.getTransformation()); Pipelines.cluster(conf, rowType, pipeline); - execEnv.execute(jobName); + execute(execEnv, false, jobName); TestData.checkWrittenDataCOW(tempFile, expected); } @@ -384,8 +367,6 @@ public void testHoodiePipelineBuilderSink() throws Exception { execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); - options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString()); Configuration conf = Configuration.fromMap(options); // Read from file source @@ -405,16 +386,15 @@ public void testHoodiePipelineBuilderSink() throws Exception { TextInputFormat format = new TextInputFormat(new Path(sourcePath)); format.setFilesFilter(FilePathFilter.createDefaultFilter()); - TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; format.setCharsetName("UTF-8"); DataStream dataStream = execEnv - // use PROCESS_CONTINUOUSLY mode to trigger checkpoint - .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) + // use continuous file source to trigger checkpoint + .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2)) + .name("continuous_file_source") + .setParallelism(1) .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) - .setParallelism(1); - - + .setParallelism(4); //sink to hoodie table use low-level sink api. HoodiePipeline.Builder builder = HoodiePipeline.builder("test_sink") @@ -429,7 +409,7 @@ public void testHoodiePipelineBuilderSink() throws Exception { builder.sink(dataStream, false); - execute(execEnv, true, "Api_Sink_Test"); + execute(execEnv, false, "Api_Sink_Test"); TestData.checkWrittenDataCOW(tempFile, EXPECTED); } }