diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index c0cd386793b7..d6e70f16eaf7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -34,6 +34,7 @@ import org.apache.hudi.sync.common.util.ConfigUtils; import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.DataTypeUtils; import org.apache.hudi.util.StreamerUtil; import org.apache.avro.Schema; @@ -70,6 +71,7 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.types.DataType; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -402,17 +404,22 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep String path = hiveTable.getSd().getLocation(); Map parameters = hiveTable.getParameters(); Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, hiveConf); + String pkColumnsStr = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key()); + List pkColumns = StringUtils.isNullOrEmpty(pkColumnsStr) + ? null : StringUtils.split(pkColumnsStr, ","); org.apache.flink.table.api.Schema schema; if (latestTableSchema != null) { + // if the table is initialized from spark, the write schema is nullable for pk columns. + DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable( + AvroSchemaConverter.convertToDataType(latestTableSchema), pkColumns); org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder() - .fromRowDataType(AvroSchemaConverter.convertToDataType(latestTableSchema)); + .fromRowDataType(tableDataType); String pkConstraintName = parameters.get(PK_CONSTRAINT_NAME); - String pkColumns = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key()); if (!StringUtils.isNullOrEmpty(pkConstraintName)) { // pkColumns expect not to be null - builder.primaryKeyNamed(pkConstraintName, StringUtils.split(pkColumns, ",")); + builder.primaryKeyNamed(pkConstraintName, pkColumns); } else if (pkColumns != null) { - builder.primaryKey(StringUtils.split(pkColumns, ",")); + builder.primaryKey(pkColumns); } schema = builder.build(); } else { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java index e91432b5e359..c772dc853917 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.util; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; @@ -26,10 +27,14 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; +import javax.annotation.Nullable; + import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; /** * Utilities for {@link org.apache.flink.table.types.DataType}. @@ -123,4 +128,43 @@ public static Object resolvePartition(String partition, DataType type) { "Can not convert %s to type %s for partition value", partition, type)); } } + + /** + * Ensures the give columns of the row data type are not nullable(for example, the primary keys). + * + * @param dataType The row data type, datatype logicaltype must be rowtype + * @param pkColumns The primary keys + * @return a new row data type if any column nullability is tweaked or the original data type + */ + public static DataType ensureColumnsAsNonNullable(DataType dataType, @Nullable List pkColumns) { + if (pkColumns == null || pkColumns.isEmpty()) { + return dataType; + } + LogicalType dataTypeLogicalType = dataType.getLogicalType(); + if (!(dataTypeLogicalType instanceof RowType)) { + throw new RuntimeException("The datatype to be converted must be row type, but this type is :" + dataTypeLogicalType.getClass()); + } + RowType rowType = (RowType) dataTypeLogicalType; + List originalFieldTypes = dataType.getChildren(); + List fieldNames = rowType.getFieldNames(); + List fieldTypes = new ArrayList<>(); + boolean tweaked = false; + for (int i = 0; i < fieldNames.size(); i++) { + if (pkColumns.contains(fieldNames.get(i)) && rowType.getTypeAt(i).isNullable()) { + fieldTypes.add(originalFieldTypes.get(i).notNull()); + tweaked = true; + } else { + fieldTypes.add(originalFieldTypes.get(i)); + } + } + if (!tweaked) { + return dataType; + } + List fields = new ArrayList<>(); + for (int i = 0; i < fieldNames.size(); i++) { + fields.add(DataTypes.FIELD(fieldNames.get(i), fieldTypes.get(i))); + } + return DataTypes.ROW(fields.stream().toArray(DataTypes.Field[]::new)).notNull(); + } + }