diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java index a057c02f2cca..4383b42e9f8d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; @@ -178,9 +179,12 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) { /** * Create Hive field schemas from Flink table schema including the hoodie metadata fields. */ - public static List toHiveFieldSchema(TableSchema schema) { + public static List toHiveFieldSchema(TableSchema schema, boolean withOperationField) { List columns = new ArrayList<>(); - for (String metaField : HoodieRecord.HOODIE_META_COLUMNS) { + Collection metaFields = withOperationField + ? HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION // caution that the set may break sequence + : HoodieRecord.HOODIE_META_COLUMNS; + for (String metaField : metaFields) { columns.add(new FieldSchema(metaField, "string", null)); } columns.addAll(createHiveColumns(schema)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index c0cd386793b7..00846c8143e0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -546,7 +546,8 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, // because since Hive 3.x, there is validation when altering table, // when the metadata fields are synced through the hive sync tool, // a compatability issue would be reported. - List allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema()); + boolean withOperationField = Boolean.parseBoolean(table.getOptions().getOrDefault(FlinkOptions.CHANGELOG_ENABLED.key(), "false")); + List allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema(), withOperationField); // Table columns and partition keys CatalogTable catalogTable = (CatalogTable) table;