diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 987ae10fe75ce..41e0abd6baf10 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -149,7 +149,7 @@ private void sanityCheck(Configuration conf, ResolvedSchema schema) { } if (preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())) { conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE); - } else { + } else if (!preCombineField.equals(FlinkOptions.NO_PRE_COMBINE)) { throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema." + "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option."); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index e40c40c996092..afe3e809b0c0a 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -461,6 +461,30 @@ void testWriteAndReadWithProctimeSequence(HoodieTableType tableType, boolean hiv assertRowsEquals(result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]]"); } + @ParameterizedTest + @MethodSource("tableTypeAndPartitioningParams") + void testWriteAndReadWithProctimeSequenceWithTsColumnExisting(HoodieTableType tableType, boolean hiveStylePartitioning) { + TableEnvironment tableEnv = batchTableEnv; + String hoodieTableDDL = sql("t1") + .field("uuid varchar(20)") + .field("name varchar(10)") + .field("age int") + .field("ts timestamp(3)") // use the default precombine field 'ts' + .field("`partition` varchar(10)") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) + .option(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1); + + List result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]]"); + } + @ParameterizedTest @EnumSource(value = HoodieTableType.class) void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index 0a7a622049727..1fca92567a553 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -152,6 +152,20 @@ void testRequiredOptionsForSource() { HoodieTableSink tableSink5 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sourceContext5); assertThat(tableSource5.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(EventTimeAvroPayload.class.getName())); assertThat(tableSink5.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(EventTimeAvroPayload.class.getName())); + + // given pk and set pre combine key to no_precombine will be ok + ResolvedSchema schema5 = SchemaBuilder.instance() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build(); + this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE); + final MockContext sourceContext6 = MockContext.getInstance(this.conf, schema5, "f2"); + + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext6)); + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext6)); } @Test