Skip to content

Commit

Permalink
[HUDI-4543] Support natural order when table schema contains a field …
Browse files Browse the repository at this point in the history
…named 'ts' (apache#6246)

* be able to disable precombine field when table schema contains a field named ts

Co-authored-by: jian yonghua <jianyonghua@163.com>
  • Loading branch information
2 people authored and fengjian committed Apr 5, 2023
1 parent 40c89af commit 5798c58
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private void sanityCheck(Configuration conf, ResolvedSchema schema) {
}
if (preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())) {
conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE);
} else {
} else if (!preCombineField.equals(FlinkOptions.NO_PRE_COMBINE)) {
throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema."
+ "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,30 @@ void testWriteAndReadWithProctimeSequence(HoodieTableType tableType, boolean hiv
assertRowsEquals(result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]]");
}

@ParameterizedTest
@MethodSource("tableTypeAndPartitioningParams")
void testWriteAndReadWithProctimeSequenceWithTsColumnExisting(HoodieTableType tableType, boolean hiveStylePartitioning) {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.field("uuid varchar(20)")
.field("name varchar(10)")
.field("age int")
.field("ts timestamp(3)") // use the default precombine field 'ts'
.field("`partition` varchar(10)")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
.option(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE)
.end();
tableEnv.executeSql(hoodieTableDDL);

execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1);

List<Row> result1 = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
assertRowsEquals(result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]]");
}

@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,20 @@ void testRequiredOptionsForSource() {
HoodieTableSink tableSink5 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sourceContext5);
assertThat(tableSource5.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(EventTimeAvroPayload.class.getName()));
assertThat(tableSink5.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(EventTimeAvroPayload.class.getName()));

// given pk and set pre combine key to no_precombine will be ok
ResolvedSchema schema5 = SchemaBuilder.instance()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3))
.primaryKey("f0")
.build();
this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE);
final MockContext sourceContext6 = MockContext.getInstance(this.conf, schema5, "f2");

assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext6));
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext6));
}

@Test
Expand Down

0 comments on commit 5798c58

Please sign in to comment.