diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 24400c5eda90..02f3edcd01de 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -279,7 +279,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient()); if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) { InternalSchema internalSchema; - Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new Schema.Parser().parse(config.getSchema())); + Schema avroSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField()); if (historySchemaStr.isEmpty()) { internalSchema = AvroInternalSchemaConverter.convert(avroSchema); internalSchema.setSchemaId(Long.parseLong(instantTime)); @@ -1667,16 +1667,14 @@ public void reOrderColPosition(String colName, String referColName, TableChange. private Pair getInternalSchemaAndMetaClient() { HoodieTableMetaClient metaClient = createMetaClient(true); TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); - Option internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata(); - if (!internalSchemaOption.isPresent()) { - throw new HoodieException(String.format("cannot find schema for current table: %s", config.getBasePath())); - } - return Pair.of(internalSchemaOption.get(), metaClient); + InternalSchema internalSchema = getInternalSchema(schemaUtil); + return Pair.of(internalSchema, metaClient); } private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient metaClient) { TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); - String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse(""); + InternalSchema oldSchema = getInternalSchema(schemaUtil); + String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse(SerDeHelper.inheritSchemas(oldSchema, "")); Schema schema = AvroInternalSchemaConverter.convert(newSchema, config.getTableName()); String commitActionType = CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, metaClient.getTableType()); String instantTime = HoodieActiveTimeline.createNewInstantTime(); @@ -1692,10 +1690,20 @@ private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient m throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io); } Map extraMeta = new HashMap<>(); - extraMeta.put(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(newSchema.setSchemaId(Long.getLong(instantTime)))); + extraMeta.put(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(newSchema.setSchemaId(Long.parseLong(instantTime)))); // try to save history schemas FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient); schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr)); commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), commitActionType); } + + private InternalSchema getInternalSchema(TableSchemaResolver schemaUtil) { + return schemaUtil.getTableInternalSchemaFromCommitMetadata().orElseGet(() -> { + try { + return AvroInternalSchemaConverter.convert(schemaUtil.getTableAvroSchema()); + } catch (Exception e) { + throw new HoodieException(String.format("cannot find schema for current table: %s", config.getBasePath())); + } + }); + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java index 5ead348140aa..5bbfa03934f0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java @@ -20,10 +20,20 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.utils.MergingIterator; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.InternalSchemaCache; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.action.InternalSchemaMerger; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils; +import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; +import org.apache.hudi.internal.schema.utils.SerDeHelper; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -36,13 +46,18 @@ import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.apache.avro.Schema; +import org.apache.avro.SchemaCompatibility; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.HashMap; import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; /** * Helper to read records from previous version of base file and run Merge. @@ -130,4 +145,48 @@ protected Void getResult() { return null; } } + + protected Iterator getRecordIterator( + HoodieTable table, + HoodieMergeHandle mergeHandle, + HoodieBaseFile baseFile, + HoodieFileReader reader, + Schema readSchema) throws IOException { + Option querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema()); + if (!querySchemaOpt.isPresent()) { + querySchemaOpt = new TableSchemaResolver(table.getMetaClient()).getTableInternalSchemaFromCommitMetadata(); + } + boolean needToReWriteRecord = false; + Map renameCols = new HashMap<>(); + // TODO support bootstrap + if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) { + // check implicitly add columns, and position reorder(spark sql may change cols order) + InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get()); + long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName())); + InternalSchema writeInternalSchema = InternalSchemaCache.searchSchemaAndCache(commitInstantTime, table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable()); + if (writeInternalSchema.isEmptySchema()) { + throw new HoodieException(String.format("cannot find file schema for current commit %s", commitInstantTime)); + } + List colNamesFromQuerySchema = querySchema.getAllColsFullName(); + List colNamesFromWriteSchema = writeInternalSchema.getAllColsFullName(); + List sameCols = colNamesFromWriteSchema.stream() + .filter(f -> colNamesFromQuerySchema.contains(f) + && writeInternalSchema.findIdByName(f) == querySchema.findIdByName(f) + && writeInternalSchema.findIdByName(f) != -1 + && writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList()); + readSchema = AvroInternalSchemaConverter + .convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false, false).mergeSchema(), readSchema.getName()); + Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName()); + needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size() + || SchemaCompatibility.checkReaderWriterCompatibility(readSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; + if (needToReWriteRecord) { + renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema); + } + } + if (needToReWriteRecord) { + return HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema, renameCols); + } else { + return reader.getRecordIterator(readSchema); + } + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 5d1a55453d16..a7665c894781 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -18,25 +18,14 @@ package org.apache.hudi.table.action.commit; -import org.apache.avro.SchemaCompatibility; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.internal.schema.InternalSchema; -import org.apache.hudi.internal.schema.action.InternalSchemaMerger; -import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; -import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils; -import org.apache.hudi.internal.schema.utils.SerDeHelper; -import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -52,10 +41,6 @@ import java.io.IOException; import java.util.Iterator; -import java.util.List; -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Collectors; public class HoodieMergeHelper extends BaseMergeHelper>, HoodieData, HoodieData> { @@ -94,45 +79,12 @@ public void runMerge(HoodieTable>, HoodieData wrapper = null; HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); - Option querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema()); - boolean needToReWriteRecord = false; - Map renameCols = new HashMap<>(); - // TODO support bootstrap - if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) { - // check implicitly add columns, and position reorder(spark sql may change cols order) - InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get()); - long commitInstantTime = Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName())); - InternalSchema writeInternalSchema = InternalSchemaCache.searchSchemaAndCache(commitInstantTime, table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable()); - if (writeInternalSchema.isEmptySchema()) { - throw new HoodieException(String.format("cannot find file schema for current commit %s", commitInstantTime)); - } - List colNamesFromQuerySchema = querySchema.getAllColsFullName(); - List colNamesFromWriteSchema = writeInternalSchema.getAllColsFullName(); - List sameCols = colNamesFromWriteSchema.stream() - .filter(f -> colNamesFromQuerySchema.contains(f) - && writeInternalSchema.findIdByName(f) == querySchema.findIdByName(f) - && writeInternalSchema.findIdByName(f) != -1 - && writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList()); - readSchema = AvroInternalSchemaConverter - .convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false, false).mergeSchema(), readSchema.getName()); - Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName()); - needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size() - || SchemaCompatibility.checkReaderWriterCompatibility(readSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; - if (needToReWriteRecord) { - renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema); - } - } - try { final Iterator readerIterator; if (baseFile.getBootstrapBaseFile().isPresent()) { readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); } else { - if (needToReWriteRecord) { - readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema, renameCols); - } else { - readerIterator = reader.getRecordIterator(readSchema); - } + readerIterator = getRecordIterator(table, mergeHandle, baseFile, reader, readSchema); } ThreadLocal encoderCache = new ThreadLocal<>(); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java index 9070f2dbacc5..4890a0f9de84 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java @@ -85,7 +85,7 @@ public void runMerge(HoodieTable>, List, List if (baseFile.getBootstrapBaseFile().isPresent()) { readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); } else { - readerIterator = reader.getRecordIterator(readSchema); + readerIterator = getRecordIterator(table, mergeHandle, baseFile, reader, readSchema); } ThreadLocal encoderCache = new ThreadLocal<>(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 8ea34d6f2fa0..83607d1758e5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; +import org.apache.hudi.internal.schema.InternalSchema; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; @@ -37,8 +38,8 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, - LogRecordScannerCallback callback, Option instantRange) { - super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, false); + LogRecordScannerCallback callback, Option instantRange, InternalSchema internalSchema) { + super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, false, true, Option.empty(), internalSchema); this.callback = callback; } @@ -84,6 +85,7 @@ public static class Builder extends AbstractHoodieLogRecordReader.Builder { private Option instantRange = Option.empty(); // specific configurations private LogRecordScannerCallback callback; + private InternalSchema internalSchema = InternalSchema.getEmptyInternalSchema(); public Builder withFileSystem(FileSystem fs) { this.fs = fs; @@ -135,10 +137,15 @@ public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) { return this; } + public Builder withInternalSchema(InternalSchema internalSchema) { + this.internalSchema = internalSchema; + return this; + } + @Override public HoodieUnMergedLogRecordScanner build() { return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, - latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback, instantRange); + latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback, instantRange, internalSchema); } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index b1a3372e0893..9f83d1005eec 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -21,6 +21,7 @@ import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy; import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; @@ -96,6 +97,12 @@ private FlinkOptions() { + "The semantics is best effort because the compaction job would finally merge all changes of a record into one.\n" + " default false to have UPSERT semantics"); + public static final ConfigOption SCHEMA_EVOLUTION_ENABLED = ConfigOptions + .key(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key()) + .booleanType() + .defaultValue(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue()) + .withDescription(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.doc()); + // ------------------------------------------------------------------------ // Metadata table Options // ------------------------------------------------------------------------ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 10d46abc94ca..d6f4dd532a17 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -37,6 +37,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction; import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.table.HoodieTable; @@ -197,7 +198,8 @@ protected void loadRecords(String partitionPath) throws Exception { if (latestCommitTime.isPresent()) { BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat()); - Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema(); + TableSchemaResolver schemaResolver = new TableSchemaResolver(this.hoodieTable.getMetaClient()); + Schema schema = schemaResolver.getTableAvroSchema(); List fileSlices = this.hoodieTable.getSliceView() .getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp()) @@ -229,7 +231,9 @@ protected void loadRecords(String partitionPath) throws Exception { .filter(logFile -> isValidFile(logFile.getFileStatus())) .map(logFile -> logFile.getPath().toString()) .collect(toList()); - HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(), + InternalSchema internalSchema = schemaResolver.getTableInternalSchemaFromCommitMetadata() + .orElse(InternalSchema.getEmptyInternalSchema()); + HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, internalSchema, latestCommitTime.get().getTimestamp(), writeConfig, hadoopConf); try { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 2034cb322eb8..237309e3e0ec 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -453,6 +453,7 @@ private MergeOnReadInputFormat mergeOnReadInputFormat( this.requiredPos, this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value + this.conf, getParquetConf(this.conf, this.hadoopConf), this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE) ); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java new file mode 100644 index 000000000000..d635819aa8d3 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.Type; +import org.apache.hudi.internal.schema.Types; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; +import org.apache.hudi.util.AvroSchemaConverter; + +import org.apache.avro.Schema; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DATE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR; + +/** + * CastMap is responsible for conversion of flink types when full schema evolution enabled. + */ +public final class CastMap implements Serializable { + private static final long serialVersionUID = 1L; + + // Maps position to corresponding cast + private final Map castMap = new HashMap<>(); + + /** + * Creates CastMap by comparing two schemes. Cast of a specific column is created if its type has changed. + */ + public static CastMap of(String tableName, InternalSchema querySchema, InternalSchema actualSchema) { + DataType queryType = internalSchemaToDataType(tableName, querySchema); + DataType actualType = internalSchemaToDataType(tableName, actualSchema); + CastMap castMap = new CastMap(); + InternalSchemaUtils.collectTypeChangedCols(querySchema, actualSchema).entrySet().stream() + .filter(e -> !isSameType(e.getValue().getLeft(), e.getValue().getRight())) + .forEach(e -> { + int pos = e.getKey(); + LogicalType target = queryType.getChildren().get(pos).getLogicalType(); + LogicalType actual = actualType.getChildren().get(pos).getLogicalType(); + castMap.add(pos, actual, target); + }); + return castMap; + } + + public Object castIfNeeded(int pos, Object val) { + Cast cast = castMap.get(pos); + if (cast == null) { + return val; + } + return cast.convert(val); + } + + public boolean containsAnyPos(int[] positions) { + return Arrays.stream(positions).anyMatch(castMap.keySet()::contains); + } + + public CastMap withNewPositions(int[] oldPositions, int[] newPositions) { + Preconditions.checkArgument(oldPositions.length == newPositions.length); + CastMap newCastMap = new CastMap(); + for (int i = 0; i < oldPositions.length; i++) { + Cast cast = castMap.get(oldPositions[i]); + if (cast != null) { + newCastMap.add(newPositions[i], cast); + } + } + return newCastMap; + } + + @VisibleForTesting + void add(int pos, LogicalType fromType, LogicalType toType) { + Function conversion = getConversion(fromType, toType); + if (conversion == null) { + throw new IllegalArgumentException(String.format("Cannot create cast %s => %s at pos %s", fromType, toType, pos)); + } + add(pos, new Cast(fromType, toType, conversion)); + } + + private Function getConversion(LogicalType fromType, LogicalType toType) { + LogicalTypeRoot from = fromType.getTypeRoot(); + LogicalTypeRoot to = toType.getTypeRoot(); + switch (to) { + case BIGINT: { + // Integer => Long + if (from == INTEGER) { + return val -> ((Number) val).longValue(); + } + break; + } + case FLOAT: { + // Integer => Float + // Long => Float + if (from == INTEGER || from == BIGINT) { + return val -> ((Number) val).floatValue(); + } + break; + } + case DOUBLE: { + // Integer => Double + // Long => Double + if (from == INTEGER || from == BIGINT) { + return val -> ((Number) val).doubleValue(); + } + // Float => Double + if (from == FLOAT) { + return val -> Double.parseDouble(val.toString()); + } + break; + } + case DECIMAL: { + // Integer => Decimal + // Long => Decimal + // Double => Decimal + if (from == INTEGER || from == BIGINT || from == DOUBLE) { + return val -> toDecimalData((Number) val, toType); + } + // Float => Decimal + if (from == FLOAT) { + return val -> toDecimalData(Double.parseDouble(val.toString()), toType); + } + // String => Decimal + if (from == VARCHAR) { + return val -> toDecimalData(Double.parseDouble(val.toString()), toType); + } + // Decimal => Decimal + if (from == DECIMAL) { + return val -> toDecimalData(((DecimalData) val).toBigDecimal(), toType); + } + break; + } + case VARCHAR: { + // Integer => String + // Long => String + // Float => String + // Double => String + // Decimal => String + if (from == INTEGER + || from == BIGINT + || from == FLOAT + || from == DOUBLE + || from == DECIMAL) { + return val -> new BinaryStringData(String.valueOf(val)); + } + // Date => String + if (from == DATE) { + return val -> new BinaryStringData(LocalDate.ofEpochDay(((Integer) val).longValue()).toString()); + } + break; + } + case DATE: { + // String => Date + if (from == VARCHAR) { + return val -> (int) LocalDate.parse(val.toString()).toEpochDay(); + } + break; + } + default: + } + return null; + } + + private void add(int pos, Cast cast) { + castMap.put(pos, cast); + } + + private DecimalData toDecimalData(Number val, LogicalType decimalType) { + BigDecimal valAsDecimal = BigDecimal.valueOf(val.doubleValue()); + return toDecimalData(valAsDecimal, decimalType); + } + + private DecimalData toDecimalData(BigDecimal valAsDecimal, LogicalType decimalType) { + return DecimalData.fromBigDecimal( + valAsDecimal, + ((DecimalType) decimalType).getPrecision(), + ((DecimalType) decimalType).getScale()); + } + + private static boolean isSameType(Type left, Type right) { + if (left instanceof Types.DecimalType && right instanceof Types.DecimalType) { + return left.equals(right); + } + return left.typeId().equals(right.typeId()); + } + + private static DataType internalSchemaToDataType(String tableName, InternalSchema internalSchema) { + Schema schema = AvroInternalSchemaConverter.convert(internalSchema, tableName); + return AvroSchemaConverter.convertToDataType(schema); + } + + /** + * {@link Cast#from} and {@link Cast#to} are redundant due to {@link Cast#convert(Object)} determines conversion. + * However, it is convenient to debug {@link CastMap} when {@link Cast#toString()} prints types. + */ + private static final class Cast implements Serializable { + private static final long serialVersionUID = 1L; + + private final LogicalType from; + private final LogicalType to; + private final Function conversion; + + Cast(LogicalType from, LogicalType to, Function conversion) { + this.from = from; + this.to = to; + this.conversion = conversion; + } + + Object convert(Object val) { + return conversion.apply(val); + } + + @Override + public String toString() { + return from + " => " + to; + } + } + + @Override + public String toString() { + return castMap.entrySet().stream() + .map(e -> e.getKey() + ": " + e.getValue()) + .collect(Collectors.joining(", ", "{", "}")); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 8adbde355cf7..8176b9a4f401 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -32,6 +32,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; +import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.util.StreamerUtil; @@ -121,6 +122,7 @@ private static Object getVal(IndexedRecord record, int pos) { public static HoodieMergedLogRecordScanner logScanner( MergeOnReadInputSplit split, Schema logSchema, + InternalSchema internalSchema, org.apache.flink.configuration.Configuration flinkConf, Configuration hadoopConf) { HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(flinkConf); @@ -130,6 +132,7 @@ public static HoodieMergedLogRecordScanner logScanner( .withBasePath(split.getTablePath()) .withLogFilePaths(split.getLogPaths().get()) .withReaderSchema(logSchema) + .withInternalSchema(internalSchema) .withLatestInstantTime(split.getLatestCommit()) .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled()) .withReverseReader(false) @@ -146,6 +149,7 @@ public static HoodieMergedLogRecordScanner logScanner( private static HoodieUnMergedLogRecordScanner unMergedLogScanner( MergeOnReadInputSplit split, Schema logSchema, + InternalSchema internalSchema, org.apache.flink.configuration.Configuration flinkConf, Configuration hadoopConf, HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) { @@ -155,6 +159,7 @@ private static HoodieUnMergedLogRecordScanner unMergedLogScanner( .withBasePath(split.getTablePath()) .withLogFilePaths(split.getLogPaths().get()) .withReaderSchema(logSchema) + .withInternalSchema(internalSchema) .withLatestInstantTime(split.getLatestCommit()) .withReadBlocksLazily( string2Boolean( @@ -185,6 +190,7 @@ public static class BoundedMemoryRecords { public BoundedMemoryRecords( MergeOnReadInputSplit split, Schema logSchema, + InternalSchema internalSchema, Configuration hadoopConf, org.apache.flink.configuration.Configuration flinkConf) { this.executor = new BoundedInMemoryExecutor<>( @@ -196,7 +202,7 @@ public BoundedMemoryRecords( Functions.noop()); // Consumer of this record reader this.iterator = this.executor.getQueue().iterator(); - this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, flinkConf, hadoopConf, + this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, internalSchema, flinkConf, hadoopConf, record -> executor.getQueue().insertRecord(record)); // Start reading and buffering this.executor.startProducers(); @@ -226,6 +232,7 @@ public void close() { public static HoodieMergedLogRecordScanner logScanner( List logPaths, Schema logSchema, + InternalSchema internalSchema, String latestInstantTime, HoodieWriteConfig writeConfig, Configuration hadoopConf) { @@ -235,6 +242,7 @@ public static HoodieMergedLogRecordScanner logScanner( .withBasePath(basePath) .withLogFilePaths(logPaths) .withReaderSchema(logSchema) + .withInternalSchema(internalSchema) .withLatestInstantTime(latestInstantTime) .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled()) .withReverseReader(false) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolutionContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolutionContext.java new file mode 100644 index 000000000000..18284daf8c76 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolutionContext.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.InternalSchemaCache; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.Types; +import org.apache.hudi.internal.schema.action.InternalSchemaMerger; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * This class is responsible for calculating names and types of fields that are actual at a certain point in time. + * If field is renamed in queried schema, its old name will be returned, which is relevant at the provided time. + * If type of field is changed, its old type will be returned, and projection will be created that will convert the old type to the queried one. + */ +public final class SchemaEvolutionContext implements Serializable { + private static final long serialVersionUID = 1L; + + private final HoodieTableMetaClient metaClient; + private final InternalSchema querySchema; + + public static Option of(Configuration conf) { + if (conf.getBoolean(FlinkOptions.SCHEMA_EVOLUTION_ENABLED)) { + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); + return new TableSchemaResolver(metaClient) + .getTableInternalSchemaFromCommitMetadata() + .map(schema -> new SchemaEvolutionContext(metaClient, schema)); + } else { + return Option.empty(); + } + } + + public SchemaEvolutionContext(HoodieTableMetaClient metaClient, InternalSchema querySchema) { + this.metaClient = metaClient; + this.querySchema = querySchema; + } + + public InternalSchema getQuerySchema() { + return querySchema; + } + + public InternalSchema getActualSchema(FileInputSplit fileSplit) { + return getActualSchema(FSUtils.getCommitTime(fileSplit.getPath().getName())); + } + + public InternalSchema getActualSchema(MergeOnReadInputSplit split) { + String commitTime = split.getBasePath() + .map(FSUtils::getCommitTime) + .orElse(split.getLatestCommit()); + return getActualSchema(commitTime); + } + + public List getFieldNames(InternalSchema internalSchema) { + return internalSchema.columns().stream().map(Types.Field::name).collect(Collectors.toList()); + } + + public List getFieldTypes(InternalSchema internalSchema) { + return AvroSchemaConverter.convertToDataType( + AvroInternalSchemaConverter.convert(internalSchema, getTableName())).getChildren(); + } + + public CastMap getCastMap(InternalSchema querySchema, InternalSchema actualSchema) { + return CastMap.of(getTableName(), querySchema, actualSchema); + } + + public static LogicalType[] project(List fieldTypes, int[] selectedFields) { + return Arrays.stream(selectedFields) + .mapToObj(pos -> fieldTypes.get(pos).getLogicalType()) + .toArray(LogicalType[]::new); + } + + private InternalSchema getActualSchema(String commitTime) { + InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(Long.parseLong(commitTime), metaClient, false); + return new InternalSchemaMerger(fileSchema, getQuerySchema(), true, true).mergeSchema(); + } + + private String getTableName() { + return metaClient.getTableConfig().getTableName(); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java index 719669b532a4..8fcdfde844fd 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java @@ -19,6 +19,10 @@ package org.apache.hudi.table.format.cow; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.table.format.CastMap; +import org.apache.hudi.table.format.SchemaEvolutionContext; import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; import org.apache.hudi.util.DataTypeUtils; @@ -29,6 +33,7 @@ import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.utils.SerializableConfiguration; +import org.apache.flink.table.data.ColumnarRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.PartitionPathUtils; @@ -36,6 +41,8 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.util.RowDataCastProjection; +import org.apache.hudi.util.RowDataProjection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +54,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Set; +import java.util.stream.IntStream; + +import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS; /** * An implementation of {@link FileInputFormat} to read {@link RowData} records @@ -77,6 +87,9 @@ public class CopyOnWriteInputFormat extends FileInputFormat { private transient ParquetColumnarRowSplitReader reader; private transient long currentReadCount; + private final Option schemaEvolutionContext; + private Option projection; + /** * Files filter for determining what files/directories should be included. */ @@ -89,6 +102,7 @@ public CopyOnWriteInputFormat( int[] selectedFields, String partDefaultName, long limit, + org.apache.flink.configuration.Configuration flinkConf, Configuration conf, boolean utcTimestamp) { super.setFilePaths(paths); @@ -99,10 +113,36 @@ public CopyOnWriteInputFormat( this.selectedFields = selectedFields; this.conf = new SerializableConfiguration(conf); this.utcTimestamp = utcTimestamp; + this.schemaEvolutionContext = SchemaEvolutionContext.of(flinkConf); } @Override public void open(FileInputSplit fileSplit) throws IOException { + String[] actualFieldNames; + DataType[] actualFieldTypes; + if (schemaEvolutionContext.isPresent()) { + SchemaEvolutionContext context = schemaEvolutionContext.get(); + InternalSchema actualSchema = context.getActualSchema(fileSplit); + List fieldTypes = context.getFieldTypes(actualSchema); + CastMap castMap = context.getCastMap(context.getQuerySchema(), actualSchema); + int[] selectedFields = Arrays.stream(this.selectedFields).map(pos -> pos + HOODIE_META_COLUMNS.size()).toArray(); + if (castMap.containsAnyPos(selectedFields)) { + int[] requiredFields = IntStream.range(0, this.selectedFields.length).toArray(); + projection = Option.of(new RowDataCastProjection( + SchemaEvolutionContext.project(fieldTypes, selectedFields), + requiredFields, + castMap.withNewPositions(selectedFields, requiredFields))); + } else { + projection = Option.empty(); + } + actualFieldNames = context.getFieldNames(actualSchema).stream().skip(HOODIE_META_COLUMNS.size()).toArray(String[]::new); + actualFieldTypes = fieldTypes.stream().skip(HOODIE_META_COLUMNS.size()).toArray(DataType[]::new); + } else { + actualFieldNames = fullFieldNames; + actualFieldTypes = fullFieldTypes; + projection = Option.empty(); + } + // generate partition specs. List fieldNameList = Arrays.asList(fullFieldNames); LinkedHashMap partSpec = PartitionPathUtils.extractPartitionSpecFromPath( @@ -116,8 +156,8 @@ public void open(FileInputSplit fileSplit) throws IOException { utcTimestamp, true, conf.conf(), - fullFieldNames, - fullFieldTypes, + actualFieldNames, + actualFieldTypes, partObjects, selectedFields, 2048, @@ -276,7 +316,8 @@ public boolean reachedEnd() throws IOException { @Override public RowData nextRecord(RowData reuse) { currentReadCount++; - return reader.nextRecord(); + ColumnarRowData rowData = reader.nextRecord(); + return projection.map(pr -> pr.project(rowData)).orElse(rowData); } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 76e9e60ee0dc..66e7a7ef93df 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -29,13 +29,17 @@ import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.keygen.KeyGenUtils; +import org.apache.hudi.table.format.CastMap; import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.table.format.FormatUtils; +import org.apache.hudi.table.format.SchemaEvolutionContext; import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil; import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.DataTypeUtils; +import org.apache.hudi.util.RowDataCastProjection; import org.apache.hudi.util.RowDataProjection; import org.apache.hudi.util.RowDataToAvroConverters; import org.apache.hudi.util.StringToRowDataConverter; @@ -135,6 +139,12 @@ public class MergeOnReadInputFormat */ private boolean closed = true; + private final Option schemaEvolutionContext; + private List actualFieldNames; + private List actualFieldTypes; + private InternalSchema actualSchema; + private InternalSchema querySchema; + private MergeOnReadInputFormat( Configuration conf, MergeOnReadTableState tableState, @@ -152,6 +162,7 @@ private MergeOnReadInputFormat( this.requiredPos = tableState.getRequiredPositions(); this.limit = limit; this.emitDelete = emitDelete; + this.schemaEvolutionContext = SchemaEvolutionContext.of(conf); } /** @@ -166,29 +177,56 @@ public void open(MergeOnReadInputSplit split) throws IOException { this.currentReadCount = 0L; this.closed = false; this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf); + if (schemaEvolutionContext.isPresent()) { + SchemaEvolutionContext context = schemaEvolutionContext.get(); + querySchema = context.getQuerySchema(); + actualSchema = context.getActualSchema(split); + actualFieldNames = context.getFieldNames(actualSchema); + actualFieldTypes = context.getFieldTypes(actualSchema); + } else { + querySchema = InternalSchema.getEmptyInternalSchema(); + actualSchema = InternalSchema.getEmptyInternalSchema(); + actualFieldNames = fieldNames; + actualFieldTypes = fieldTypes; + } + if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) { - if (split.getInstantRange() != null) { + if (split.getInstantRange().isPresent()) { // base file only with commit time filtering this.iterator = new BaseFileOnlyFilteringIterator( - split.getInstantRange(), - this.tableState.getRequiredRowType(), + split.getInstantRange().get(), getReader(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos))); + int[] positions = IntStream.range(1, requiredPos.length + 1).toArray(); + RowDataProjection projection = getCastProjection(positions) + .orElse(RowDataProjection.instance(tableState.getRequiredRowType(), positions)); + projectRecordIterator(projection); } else { // base file only this.iterator = new BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get())); + projectRecordIterator(); } } else if (!split.getBasePath().isPresent()) { // log files only if (OptionsResolver.emitChangelog(conf)) { this.iterator = new LogFileOnlyIterator(getUnMergedLogFileIterator(split)); + projectRecordIterator(); } else { this.iterator = new LogFileOnlyIterator(getLogFileIterator(split)); + projectRecordIterator(); } } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) { + RecordIterator baseFileIterator = new BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get())); this.iterator = new SkipMergeIterator( - getRequiredSchemaReader(split.getBasePath().get()), + getCastProjection().map(pr -> (RecordIterator) new ProjectionIterator(baseFileIterator, pr)).orElse(baseFileIterator), getLogFileIterator(split)); } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) { + RowDataProjection projection = getCastProjection(requiredPos) + .orElse(RowDataProjection.instance(tableState.getRequiredRowType(), requiredPos)); + Option projectionBeforeMerge = schemaEvolutionContext.map(context -> { + CastMap castMap = context.getCastMap(querySchema, actualSchema); + int[] positions = IntStream.range(0, actualFieldTypes.size()).toArray(); + return new RowDataCastProjection(SchemaEvolutionContext.project(actualFieldTypes, positions), positions, castMap); + }); this.iterator = new MergeIterator( conf, hadoopConf, @@ -197,6 +235,9 @@ public void open(MergeOnReadInputSplit split) throws IOException { this.tableState.getRequiredRowType(), new Schema.Parser().parse(this.tableState.getAvroSchema()), new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()), + this.querySchema, + projection, + projectionBeforeMerge, this.requiredPos, this.emitDelete, this.tableState.getOperationPos(), @@ -305,8 +346,8 @@ private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos) this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE), true, HadoopConfigurations.getParquetConf(this.conf, hadoopConf), - fieldNames.toArray(new String[0]), - fieldTypes.toArray(new DataType[0]), + actualFieldNames.toArray(new String[0]), + actualFieldTypes.toArray(new DataType[0]), partObjects, requiredPos, 2048, @@ -321,7 +362,7 @@ private ClosableIterator getLogFileIterator(MergeOnReadInputSplit split final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema); final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); - final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, conf, hadoopConf); + final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, querySchema, conf, hadoopConf); final Iterator logRecordsKeyIterator = scanner.getRecords().keySet().iterator(); final int[] pkOffset = tableState.getPkOffsetsInRequired(); // flag saying whether the pk semantics has been dropped by user specified @@ -401,7 +442,7 @@ private ClosableIterator getUnMergedLogFileIterator(MergeOnReadInputSpl final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema); final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); - final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, hadoopConf, conf); + final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, actualSchema, hadoopConf, conf); final Iterator> recordsIterator = records.getRecordsIterator(); return new ClosableIterator() { @@ -455,6 +496,31 @@ private interface RecordIterator { void close() throws IOException; } + static class ProjectionIterator implements RecordIterator { + private final RecordIterator iterator; + private final RowDataProjection projection; + + ProjectionIterator(RecordIterator iterator, RowDataProjection projection) { + this.iterator = iterator; + this.projection = projection; + } + + @Override + public boolean reachedEnd() throws IOException { + return iterator.reachedEnd(); + } + + @Override + public RowData nextRecord() { + return projection.project(iterator.nextRecord()); + } + + @Override + public void close() throws IOException { + iterator.close(); + } + } + static class BaseFileOnlyIterator implements RecordIterator { // base file reader private final ParquetColumnarRowSplitReader reader; @@ -488,30 +554,20 @@ static class BaseFileOnlyFilteringIterator implements RecordIterator { // base file reader private final ParquetColumnarRowSplitReader reader; private final InstantRange instantRange; - private final RowDataProjection projection; private RowData currentRecord; - BaseFileOnlyFilteringIterator( - Option instantRange, - RowType requiredRowType, - ParquetColumnarRowSplitReader reader) { + BaseFileOnlyFilteringIterator(InstantRange instantRange, ParquetColumnarRowSplitReader reader) { + this.instantRange = instantRange; this.reader = reader; - this.instantRange = instantRange.orElse(null); - int[] positions = IntStream.range(1, 1 + requiredRowType.getFieldCount()).toArray(); - projection = RowDataProjection.instance(requiredRowType, positions); } @Override public boolean reachedEnd() throws IOException { while (!this.reader.reachedEnd()) { currentRecord = this.reader.nextRecord(); - if (instantRange != null) { - boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString()); - if (isInRange) { - return false; - } - } else { + boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString()); + if (isInRange) { return false; } } @@ -520,8 +576,7 @@ public boolean reachedEnd() throws IOException { @Override public RowData nextRecord() { - // can promote: no need to project with null instant range - return projection.project(currentRecord); + return currentRecord; } @Override @@ -560,9 +615,9 @@ public void close() { static class SkipMergeIterator implements RecordIterator { // base file reader - private final ParquetColumnarRowSplitReader reader; + private final RecordIterator baseIterator; // iterator for log files - private final ClosableIterator iterator; + private final ClosableIterator logsIterator; // add the flag because the flink ParquetColumnarRowSplitReader is buggy: // method #reachedEnd() returns false after it returns true. @@ -571,20 +626,20 @@ static class SkipMergeIterator implements RecordIterator { private RowData currentRecord; - SkipMergeIterator(ParquetColumnarRowSplitReader reader, ClosableIterator iterator) { - this.reader = reader; - this.iterator = iterator; + SkipMergeIterator(RecordIterator baseIterator, ClosableIterator logsIterator) { + this.baseIterator = baseIterator; + this.logsIterator = logsIterator; } @Override public boolean reachedEnd() throws IOException { - if (!readLogs && !this.reader.reachedEnd()) { - currentRecord = this.reader.nextRecord(); + if (!readLogs && !this.baseIterator.reachedEnd()) { + currentRecord = this.baseIterator.nextRecord(); return false; } readLogs = true; - if (this.iterator.hasNext()) { - currentRecord = this.iterator.next(); + if (this.logsIterator.hasNext()) { + currentRecord = this.logsIterator.next(); return false; } return true; @@ -597,11 +652,11 @@ public RowData nextRecord() { @Override public void close() throws IOException { - if (this.reader != null) { - this.reader.close(); + if (this.baseIterator != null) { + this.baseIterator.close(); } - if (this.iterator != null) { - this.iterator.close(); + if (this.logsIterator != null) { + this.logsIterator.close(); } } } @@ -624,6 +679,7 @@ static class MergeIterator implements RecordIterator { private final GenericRecordBuilder recordBuilder; private final RowDataProjection projection; + private final Option projectionBeforeMerge; private final InstantRange instantRange; @@ -644,13 +700,16 @@ static class MergeIterator implements RecordIterator { RowType requiredRowType, Schema tableSchema, Schema requiredSchema, + InternalSchema internalSchema, + RowDataProjection projection, + Option projectionBeforeMerge, int[] requiredPos, boolean emitDelete, int operationPos, ParquetColumnarRowSplitReader reader) { // the reader should be with full schema this.tableSchema = tableSchema; this.reader = reader; - this.scanner = FormatUtils.logScanner(split, tableSchema, finkConf, hadoopConf); + this.scanner = FormatUtils.logScanner(split, tableSchema, internalSchema, finkConf, hadoopConf); this.logKeysIterator = scanner.getRecords().keySet().iterator(); this.requiredSchema = requiredSchema; this.requiredPos = requiredPos; @@ -659,7 +718,8 @@ static class MergeIterator implements RecordIterator { this.recordBuilder = new GenericRecordBuilder(requiredSchema); this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType); this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType); - this.projection = RowDataProjection.instance(requiredRowType, requiredPos); + this.projection = projection; + this.projectionBeforeMerge = projectionBeforeMerge; this.instantRange = split.getInstantRange().orElse(null); } @@ -677,6 +737,7 @@ public boolean reachedEnd() throws IOException { final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString(); if (scanner.getRecords().containsKey(curKey)) { keyToSkip.add(curKey); + currentRecord = projectionBeforeMerge.map(pr -> pr.project(currentRecord)).orElse(currentRecord); Option mergedAvroRecord = mergeRowWithLog(currentRecord, curKey); if (!mergedAvroRecord.isPresent()) { // deleted @@ -817,4 +878,29 @@ private static int[] getRequiredPosWithCommitTime(int[] requiredPos) { public void isEmitDelete(boolean emitDelete) { this.emitDelete = emitDelete; } + + private void projectRecordIterator() { + getCastProjection().ifPresent(this::projectRecordIterator); + } + + private void projectRecordIterator(RowDataProjection projection) { + this.iterator = new ProjectionIterator(this.iterator, projection); + } + + private Option getCastProjection() { + return getCastProjection(IntStream.range(0, requiredPos.length).toArray()); + } + + private Option getCastProjection(int[] positions) { + if (schemaEvolutionContext.isPresent()) { + CastMap castMap = schemaEvolutionContext.get().getCastMap(querySchema, actualSchema); + if (castMap.containsAnyPos(requiredPos)) { + return Option.of(new RowDataCastProjection( + SchemaEvolutionContext.project(actualFieldTypes, requiredPos), + positions, + castMap.withNewPositions(requiredPos, IntStream.range(0, requiredPos.length).toArray()))); + } + } + return Option.empty(); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java new file mode 100644 index 000000000000..fbabaeddd815 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.table.format.CastMap; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; + +/** + * This class is responsible to project row as well as {@link RowDataProjection}. + * In addition, fields are converted according to the CastMap. + */ +public final class RowDataCastProjection extends RowDataProjection { + private static final long serialVersionUID = 1L; + + private final CastMap castMap; + + public RowDataCastProjection(LogicalType[] types, int[] positions, CastMap castMap) { + super(types, positions); + this.castMap = castMap; + } + + @Override + public RowData project(RowData rowData) { + RowData.FieldGetter[] fieldGetters = getFieldGetters(); + GenericRowData genericRowData = new GenericRowData(fieldGetters.length); + for (int i = 0; i < fieldGetters.length; i++) { + Object val = fieldGetters[i].getFieldOrNull(rowData); + if (val != null) { + val = castMap.castIfNeeded(i, val); + } + genericRowData.setField(i, val); + } + return genericRowData; + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java index 8076d982b991..0f0ac2a40884 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java @@ -37,7 +37,7 @@ public class RowDataProjection implements Serializable { private final RowData.FieldGetter[] fieldGetters; - private RowDataProjection(LogicalType[] types, int[] positions) { + protected RowDataProjection(LogicalType[] types, int[] positions) { ValidationUtils.checkArgument(types.length == positions.length, "types and positions should have the equal number"); this.fieldGetters = new RowData.FieldGetter[types.length]; @@ -86,4 +86,8 @@ public Object[] projectAsValues(RowData rowData) { } return values; } + + protected RowData.FieldGetter[] getFieldGetters() { + return fieldGetters; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestSchemaEvolution.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestSchemaEvolution.java new file mode 100644 index 000000000000..16957b305a45 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestSchemaEvolution.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink; + +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.internal.schema.Types; +import org.apache.hudi.keygen.ComplexAvroKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.table.HoodieTableFactory; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.Preconditions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.internal.schema.action.TableChange.ColumnPositionChange.ColumnPositionType.AFTER; +import static org.apache.hudi.utils.TestConfigurations.ROW_TYPE; +import static org.apache.hudi.utils.TestConfigurations.ROW_TYPE_EVOLUTION; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ITTestSchemaEvolution extends AbstractTestBase { + @TempDir File tempFile; + StreamExecutionEnvironment env; + StreamTableEnvironment tEnv; + + String[] expectedMergedResult = new String[] { + "+I[Danny, 10000.1, 23]", + "+I[Stephen, null, 33]", + "+I[Julian, 30000.3, 53]", + "+I[Fabian, null, 31]", + "+I[Sophia, null, 18]", + "+I[Emma, null, 20]", + "+I[Bob, null, 44]", + "+I[Han, null, 56]", + "+I[Alice, 90000.9, unknown]" + }; + + String[] expectedUnMergedResult = new String[] { + "+I[Danny, null, 23]", + "+I[Stephen, null, 33]", + "+I[Julian, null, 53]", + "+I[Fabian, null, 31]", + "+I[Sophia, null, 18]", + "+I[Emma, null, 20]", + "+I[Bob, null, 44]", + "+I[Han, null, 56]", + "+I[Alice, 90000.9, unknown]", + "+I[Danny, 10000.1, 23]", + "+I[Julian, 30000.3, 53]" + }; + + @BeforeEach + public void setUp() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + tEnv = StreamTableEnvironment.create(env); + } + + @Test + public void testCopyOnWriteInputFormat() throws Exception { + testRead(defaultOptionMap(tempFile.getAbsolutePath())); + } + + @Test + public void testMergeOnReadInputFormatBaseFileOnlyIterator() throws Exception { + OptionMap optionMap = defaultOptionMap(tempFile.getAbsolutePath()); + optionMap.put(FlinkOptions.READ_AS_STREAMING.key(), true); + optionMap.put(FlinkOptions.READ_START_COMMIT.key(), FlinkOptions.START_COMMIT_EARLIEST); + testRead(optionMap); + } + + @Test + public void testMergeOnReadInputFormatBaseFileOnlyFilteringIterator() throws Exception { + OptionMap optionMap = defaultOptionMap(tempFile.getAbsolutePath()); + optionMap.put(FlinkOptions.READ_AS_STREAMING.key(), true); + optionMap.put(FlinkOptions.READ_START_COMMIT.key(), 1); + testRead(optionMap); + } + + @Test + public void testMergeOnReadInputFormatLogFileOnlyIteratorGetLogFileIterator() throws Exception { + OptionMap optionMap = defaultOptionMap(tempFile.getAbsolutePath()); + optionMap.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + testRead(optionMap); + } + + @Test + public void testMergeOnReadInputFormatLogFileOnlyIteratorGetUnMergedLogFileIterator() throws Exception { + OptionMap optionMap = defaultOptionMap(tempFile.getAbsolutePath()); + optionMap.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + optionMap.put(FlinkOptions.READ_AS_STREAMING.key(), true); + optionMap.put(FlinkOptions.READ_START_COMMIT.key(), FlinkOptions.START_COMMIT_EARLIEST); + optionMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), true); + testRead(optionMap, expectedUnMergedResult); + } + + @Test + public void testMergeOnReadInputFormatMergeIterator() throws Exception { + OptionMap optionMap = defaultOptionMap(tempFile.getAbsolutePath()); + optionMap.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + optionMap.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 1); + testRead(optionMap, true); + } + + @Test + public void testMergeOnReadInputFormatSkipMergeIterator() throws Exception { + OptionMap optionMap = defaultOptionMap(tempFile.getAbsolutePath()); + optionMap.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + optionMap.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 1); + optionMap.put(FlinkOptions.MERGE_TYPE.key(), FlinkOptions.REALTIME_SKIP_MERGE); + testRead(optionMap, true, expectedUnMergedResult); + } + + @SuppressWarnings({"SqlDialectInspection", "SqlNoDataSourceInspection"}) + @Test + public void testCompaction() throws Exception { + OptionMap optionMap = defaultOptionMap(tempFile.getAbsolutePath()); + optionMap.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + optionMap.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 1); + testRead(optionMap, new String[0]); + try (HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(optionMap.toConfig())) { + Option compactionInstant = writeClient.scheduleCompaction(Option.empty()); + writeClient.compact(compactionInstant.get()); + } + //language=SQL + TableResult tableResult = tEnv.executeSql("select first_name, salary, age from t1"); + checkAnswer(tableResult, expectedMergedResult); + } + + private void testRead(OptionMap optionMap) throws Exception { + testRead(optionMap, false); + } + + private void testRead(OptionMap optionMap, String... expectedResult) throws Exception { + testRead(optionMap, false, expectedResult); + } + + private void testRead(OptionMap optionMap, boolean shouldCompact) throws Exception { + testRead(optionMap, shouldCompact, expectedMergedResult); + } + + /** + * 1) Write data with schema1 + * 2) Compaction (optional) + * 3) Evolution schema1 => schema2 + * 4) Write data with schema2 + * 5) Read all data + */ + @SuppressWarnings({"SqlDialectInspection", "SqlNoDataSourceInspection"}) + private void testRead(OptionMap optionMap, boolean shouldCompact, String... expectedResult) throws Exception { + //language=SQL + tEnv.executeSql("" + + "create table t1 (" + + " uuid string," + + " name string," + + " age int," + + " ts timestamp," + + " `partition` string" + + ") partitioned by (`partition`) with (" + optionMap + ")" + ); + //language=SQL + tEnv.executeSql("" + + "insert into t1 select " + + " cast(uuid as string)," + + " cast(name as string)," + + " cast(age as int)," + + " cast(ts as timestamp)," + + " cast(`partition` as string) " + + "from (values " + + " ('id1', 'Danny', 23, '2000-01-01 00:00:01', 'par1')," + + " ('id2', 'Stephen', 33, '2000-01-01 00:00:02', 'par1')," + + " ('id3', 'Julian', 53, '2000-01-01 00:00:03', 'par2')," + + " ('id4', 'Fabian', 31, '2000-01-01 00:00:04', 'par2')," + + " ('id5', 'Sophia', 18, '2000-01-01 00:00:05', 'par3')," + + " ('id6', 'Emma', 20, '2000-01-01 00:00:06', 'par3')," + + " ('id7', 'Bob', 44, '2000-01-01 00:00:07', 'par4')," + + " ('id8', 'Han', 56, '2000-01-01 00:00:08', 'par4')" + + ") as A(uuid, name, age, ts, `partition`)" + ).await(); + + try (HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(optionMap.toConfig())) { + if (shouldCompact) { + Option compactionInstant = writeClient.scheduleCompaction(Option.empty()); + writeClient.compact(compactionInstant.get()); + } + Schema doubleType = SchemaBuilder.unionOf().nullType().and().doubleType().endUnion(); + writeClient.addColumn("salary", doubleType, null, "age", AFTER); + writeClient.renameColumn("name", "first_name"); + writeClient.updateColumnType("age", Types.StringType.get()); + } + + tEnv.executeSql("drop table t1"); + optionMap.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), AvroSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION)); + + //language=SQL + tEnv.executeSql("" + + "create table t1 (" + + " uuid string," + + " first_name string," + + " age string," + + " salary double," + + " ts timestamp," + + " `partition` string" + + ") partitioned by (`partition`) with (" + optionMap + ")" + ); + //language=SQL + tEnv.executeSql("" + + "insert into t1 select " + + " cast(uuid as string)," + + " cast(first_name as string)," + + " cast(age as string)," + + " cast(salary as double)," + + " cast(ts as timestamp)," + + " cast(`partition` as string) " + + "from (values " + + " ('id1', 'Danny', '23', 10000.1, '2000-01-01 00:00:01', 'par1')," + + " ('id9', 'Alice', 'unknown', 90000.9, '2000-01-01 00:00:09', 'par1')," + + " ('id3', 'Julian', '53', 30000.3, '2000-01-01 00:00:03', 'par2')" + + ") as A(uuid, first_name, age, salary, ts, `partition`)" + ).await(); + + TableResult tableResult = tEnv.executeSql("select first_name, salary, age from t1"); + checkAnswer(tableResult, expectedResult); + } + + private OptionMap defaultOptionMap(String tablePath) { + return new OptionMap( + FactoryUtil.CONNECTOR.key(), HoodieTableFactory.FACTORY_ID, + FlinkOptions.PATH.key(), tablePath, + FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_COPY_ON_WRITE, + HoodieTableConfig.NAME.key(), "t1", + FlinkOptions.READ_AS_STREAMING.key(), false, + FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_SNAPSHOT, + KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid", + KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition", + KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), true, + HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), ComplexAvroKeyGenerator.class.getName(), + FlinkOptions.WRITE_BATCH_SIZE.key(), 0.000001, // trigger flush after each record + FlinkOptions.SOURCE_AVRO_SCHEMA.key(), AvroSchemaConverter.convertToSchema(ROW_TYPE), + FlinkOptions.READ_TASKS.key(), 1, + FlinkOptions.WRITE_TASKS.key(), 1, + FlinkOptions.INDEX_BOOTSTRAP_TASKS.key(), 1, + FlinkOptions.BUCKET_ASSIGN_TASKS.key(), 1, + FlinkOptions.COMPACTION_TASKS.key(), 1, + FlinkOptions.SCHEMA_EVOLUTION_ENABLED.key(), true); + } + + private void checkAnswer(TableResult actualResult, String... expectedResult) { + Set expected = new HashSet<>(Arrays.asList(expectedResult)); + Set actual = new HashSet<>(expected.size()); + try (CloseableIterator iterator = actualResult.collect()) { + for (int i = 0; i < expected.size() && iterator.hasNext(); i++) { + actual.add(iterator.next().toString()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + assertEquals(expected, actual); + } + + private static final class OptionMap { + private final Map map = new HashMap<>(); + + OptionMap(Object... options) { + Preconditions.checkArgument(options.length % 2 == 0); + for (int i = 0; i < options.length; i += 2) { + String key = Objects.toString(options[i]); + String value = Objects.toString(options[i + 1]); + map.put(key, value); + } + } + + void put(Object key, Object value) { + map.put(Objects.toString(key), Objects.toString(value)); + } + + Configuration toConfig() { + return FlinkOptions.fromMap(map); + } + + @Override + public String toString() { + return map.entrySet().stream() + .map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue())) + .collect(Collectors.joining(", ")); + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestCastMap.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestCastMap.java new file mode 100644 index 000000000000..47254d2af94d --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestCastMap.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; + +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.time.LocalDate; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests for {@link CastMap}. + */ +public class TestCastMap { + + @Test + public void testCastInt() { + CastMap castMap = new CastMap(); + castMap.add(0, new IntType(), new BigIntType()); + castMap.add(1, new IntType(), new FloatType()); + castMap.add(2, new IntType(), new DoubleType()); + castMap.add(3, new IntType(), new DecimalType()); + castMap.add(4, new IntType(), new VarCharType()); + int val = 1; + assertEquals(1L, castMap.castIfNeeded(0, val)); + assertEquals(1.0F, castMap.castIfNeeded(1, val)); + assertEquals(1.0, castMap.castIfNeeded(2, val)); + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), castMap.castIfNeeded(3, val)); + assertEquals(BinaryStringData.fromString("1"), castMap.castIfNeeded(4, val)); + } + + @Test + public void testCastLong() { + CastMap castMap = new CastMap(); + castMap.add(0, new BigIntType(), new FloatType()); + castMap.add(1, new BigIntType(), new DoubleType()); + castMap.add(2, new BigIntType(), new DecimalType()); + castMap.add(3, new BigIntType(), new VarCharType()); + long val = 1L; + assertEquals(1.0F, castMap.castIfNeeded(0, val)); + assertEquals(1.0, castMap.castIfNeeded(1, val)); + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), castMap.castIfNeeded(2, val)); + assertEquals(BinaryStringData.fromString("1"), castMap.castIfNeeded(3, val)); + } + + @Test + public void testCastFloat() { + CastMap castMap = new CastMap(); + castMap.add(0, new FloatType(), new DoubleType()); + castMap.add(1, new FloatType(), new DecimalType()); + castMap.add(2, new FloatType(), new VarCharType()); + float val = 1F; + assertEquals(1.0, castMap.castIfNeeded(0, val)); + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), castMap.castIfNeeded(1, val)); + assertEquals(BinaryStringData.fromString("1.0"), castMap.castIfNeeded(2, val)); + } + + @Test + public void testCastDouble() { + CastMap castMap = new CastMap(); + castMap.add(0, new DoubleType(), new DecimalType()); + castMap.add(1, new DoubleType(), new VarCharType()); + double val = 1; + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), castMap.castIfNeeded(0, val)); + assertEquals(BinaryStringData.fromString("1.0"), castMap.castIfNeeded(1, val)); + } + + @Test + public void testCastDecimal() { + CastMap castMap = new CastMap(); + castMap.add(0, new DecimalType(2, 1), new DecimalType(3, 2)); + castMap.add(1, new DecimalType(), new VarCharType()); + DecimalData val = DecimalData.fromBigDecimal(BigDecimal.ONE, 2, 1); + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 3, 2), castMap.castIfNeeded(0, val)); + assertEquals(BinaryStringData.fromString("1.0"), castMap.castIfNeeded(1, val)); + } + + @Test + public void testCastString() { + CastMap castMap = new CastMap(); + castMap.add(0, new VarCharType(), new DecimalType()); + castMap.add(1, new VarCharType(), new DateType()); + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), castMap.castIfNeeded(0, BinaryStringData.fromString("1.0"))); + assertEquals((int) LocalDate.parse("2022-05-12").toEpochDay(), castMap.castIfNeeded(1, BinaryStringData.fromString("2022-05-12"))); + } + + @Test + public void testCastDate() { + CastMap castMap = new CastMap(); + castMap.add(0, new DateType(), new VarCharType()); + assertEquals(BinaryStringData.fromString("2022-05-12"), castMap.castIfNeeded(0, (int) LocalDate.parse("2022-05-12").toEpochDay())); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index a5b7e368a885..03e2c6b7d512 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -84,6 +84,16 @@ private TestConfigurations() { public static final RowType ROW_TYPE_DATE = (RowType) ROW_DATA_TYPE_DATE.getLogicalType(); + public static final DataType ROW_DATA_TYPE_EVOLUTION = DataTypes.ROW( + DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)), + DataTypes.FIELD("first_name", DataTypes.VARCHAR(10)), // renamed + DataTypes.FIELD("age", DataTypes.VARCHAR(10)), // changed type + DataTypes.FIELD("salary", DataTypes.DOUBLE()), // new field + DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), + DataTypes.FIELD("partition", DataTypes.VARCHAR(10))).notNull(); + + public static final RowType ROW_TYPE_EVOLUTION = (RowType) ROW_DATA_TYPE_EVOLUTION.getLogicalType(); + public static String getCreateHoodieTableDDL(String tableName, Map options) { return getCreateHoodieTableDDL(tableName, options, true, "partition"); }