Skip to content

Commit

Permalink
[HUDI-4665] Flipping default for "ignore failed batch" config in stre…
Browse files Browse the repository at this point in the history
…aming sink to false (apache#6450)
  • Loading branch information
nsivabalan authored and voonhous committed Oct 7, 2022
1 parent 7fcc64c commit 10975fd
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ private FlinkOptions() {
.booleanType()
.defaultValue(false)
.withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch.\n"
+ "By default false");
+ "By default false. Turning this on, could hide the write status errors while the spark checkpoint moves ahead. \n"
+ " So, would recommend users to use this with caution.");

public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
.key(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,14 @@ object DataSourceWriteOptions {
.withDocumentation(" Config to indicate how long (by millisecond) before a retry should issued for failed microbatch")

/**
* By default true (in favor of streaming progressing over data integrity)
* By default false. If users prefer streaming progress over data integrity, can set this to true.
*/
val STREAMING_IGNORE_FAILED_BATCH: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.streaming.ignore.failed.batch")
.defaultValue("true")
.defaultValue("false")
.withDocumentation("Config to indicate whether to ignore any non exception error (e.g. writestatus error)"
+ " within a streaming microbatch")
+ " within a streaming microbatch. Turning this on, could hide the write status errors while the spark checkpoint moves ahead." +
"So, would recommend users to use this with caution.")

val META_SYNC_CLIENT_TOOL_CLASS_NAME: ConfigProperty[String] = ConfigProperty
.key("hoodie.meta.sync.client.tool.class")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
}
log.error(s"Micro batch id=$batchId threw following exception: ", e)
if (ignoreFailedBatch) {
log.info(s"Ignore the exception and move on streaming as per " +
log.warn(s"Ignore the exception and move on streaming as per " +
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration")
Success((true, None, None))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ public void stream(Dataset<Row> streamingInput, String operationType, String che
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "true")
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
.option(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA.key(), "false")
.option(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH().key(),"true")
.option(HoodieWriteConfig.TBL_NAME.key(), tableName).option("checkpointLocation", checkpointLocation)
.outputMode(OutputMode.Append());

Expand Down

0 comments on commit 10975fd

Please sign in to comment.